Como alterar os tipos de coluna no DataFrame do Spark SQL?

Suponha que eu esteja fazendo algo como:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true")) df.printSchema() root |-- year: string (nullable = true) |-- make: string (nullable = true) |-- model: string (nullable = true) |-- comment: string (nullable = true) |-- blank: string (nullable = true) df.show() year make model comment blank 2012 Tesla S No comment 1997 Ford E350 Go get one now th... 

mas eu realmente queria o year como Int (e talvez transformar algumas outras colunas).

O melhor que eu consegui fazer é

 df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank) org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string] 

que é um pouco complicado.

Eu estou vindo de R, e estou acostumado a ser capaz de escrever, por exemplo

 df2 % mutate(year = year %>% as.integer, make = make %>% toupper) 

Eu provavelmente estou perdendo alguma coisa, já que deveria haver uma maneira melhor de fazer isso em faísca / scala

Desde o Spark versão 1.4, você pode aplicar o método cast com DataType na coluna:

 import org.apache.spark.sql.types.IntegerType val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType)) .drop("year") .withColumnRenamed("yearTmp", "year") 

Se você estiver usando expressões sql, também poderá fazer:

 val df2 = df.selectExpr("cast(year as int) year", "make", "model", "comment", "blank") 

Para mais informações, consulte os documentos: http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

[EDIT: março de 2016: obrigado pelos votos! Embora, na verdade, essa não seja a melhor resposta, acho que as soluções baseadas em withColumn , withColumnRenamed e cast apresentadas por msemelman, Martin Senne e outros são mais simples e mais limpas].

Eu acho que sua abordagem é ok, lembre-se que um Spark DataFrame é um (imutável) RDD de linhas, por isso nunca estamos realmente substituindo uma coluna, apenas criando novo DataFrame cada vez com um novo esquema.

Supondo que você tenha um df original com o seguinte esquema:

 scala> df.printSchema root |-- Year: string (nullable = true) |-- Month: string (nullable = true) |-- DayofMonth: string (nullable = true) |-- DayOfWeek: string (nullable = true) |-- DepDelay: string (nullable = true) |-- Distance: string (nullable = true) |-- CRSDepTime: string (nullable = true) 

E algumas UDFs são definidas em uma ou várias colunas:

 import org.apache.spark.sql.functions._ val toInt = udf[Int, String]( _.toInt) val toDouble = udf[Double, String]( _.toDouble) val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) val days_since_nearest_holidays = udf( (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12 ) 

Alterar tipos de colunas ou até mesmo construir um novo DataFrame a partir de outro pode ser escrito assim:

 val featureDf = df .withColumn("departureDelay", toDouble(df("DepDelay"))) .withColumn("departureHour", toHour(df("CRSDepTime"))) .withColumn("dayOfWeek", toInt(df("DayOfWeek"))) .withColumn("dayOfMonth", toInt(df("DayofMonth"))) .withColumn("month", toInt(df("Month"))) .withColumn("distance", toDouble(df("Distance"))) .withColumn("nearestHoliday", days_since_nearest_holidays( df("Year"), df("Month"), df("DayofMonth")) ) .select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", "month", "distance", "nearestHoliday") 

que produz:

 scala> df.printSchema root |-- departureDelay: double (nullable = true) |-- departureHour: integer (nullable = true) |-- dayOfWeek: integer (nullable = true) |-- dayOfMonth: integer (nullable = true) |-- month: integer (nullable = true) |-- distance: double (nullable = true) |-- nearestHoliday: integer (nullable = true) 

Isso é muito próximo da sua própria solução. Simplesmente, manter as alterações de tipo e outras transformações como udf val s separadas torna o código mais legível e reutilizável.

Como a operação de cast está disponível para Spark Column ‘s (e como eu pessoalmente não sou a favor de udf como proposto por Svend neste momento), que tal:

 df.select( df("year").cast(IntegerType).as("year"), ... ) 

para transmitir para o tipo solicitado? Como um efeito colateral puro, valores não convertíveis / “conversíveis” nesse sentido, se tornarão null .

Caso você precise disso como um método auxiliar , use:

 object DFHelper{ def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = { df.withColumn( cn, df(cn).cast(tpe) ) } } 

que é usado como:

 import DFHelper._ val df2 = castColumnTo( df, "year", IntegerType ) 

Primeiro, se você quer lançar o tipo

 import org.apache.spark.sql df.withColumn("year", $"year".cast(sql.types.IntegerType)) 

Com o mesmo nome de coluna, a coluna será substituída por uma nova, não é necessário adicionar e excluir.

Em segundo lugar, sobre Scala vs R. o código Scala mais semelhante ao R que eu posso conseguir:

 val df2 = df.select( df.columns.map { case year @ "year" => df(year).cast(IntegerType).as(year) case make @ "make" => functions.upper(df(make)).as(make) case other => df(other) }: _* ) 

Embora o comprimento seja um pouco maior que o de R. Note que o mutate é uma function para o frame de dados R, então o Scala é muito bom em potência expressiva dada sem usar uma function especial.

( df.columns é surpreendentemente uma Array [String] em vez de Array [Column], talvez eles queiram parecer com o dataframe dos pandas do Python.)

Você pode usar selectExpr para torná-lo um pouco mais limpo:

 df.selectExpr("cast(year as int) as year", "upper(make) as make", "model", "comment", "blank") 

Para converter o ano da string para int, você pode adicionar a seguinte opção ao leitor csv: “inferSchema” -> “true”, veja a documentação do DataBricks

Então, isso só funciona se você estiver gravando problemas em um driver jdbc como o sqlserver, mas é realmente útil para erros que você encontrará com syntax e tipos.

 import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect} import org.apache.spark.sql.jdbc.JdbcType val SQLServerDialect = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver") override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR)) case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT)) case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT)) case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE)) case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL)) case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER)) case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY)) case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL)) case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC") } } JdbcDialects.registerDialect(SQLServerDialect) 

Código Java para modificar o tipo de dados do DataFrame de String para Integer

 df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType)) 

Ele simplesmente converterá o tipo de dados (String) existente em Integer.

as respostas sugerindo usar casting, FYI, o método de casting em faísca 1.4.1 está quebrado.

por exemplo, um dataframe com uma coluna de string com valor “8182175552014127960” quando convertido em bigint tem valor “8182175552014128100”

  df.show +-------------------+ | a| +-------------------+ |8182175552014127960| +-------------------+ df.selectExpr("cast(a as bigint) a").show +-------------------+ | a| +-------------------+ |8182175552014128100| +-------------------+ 

Tivemos que enfrentar um grande problema antes de encontrar esse bug porque tínhamos colunas bigint em produção.

 df.select($"long_col".cast(IntegerType).as("int_col")) 

Esse método descartará a coluna antiga e criará novas colunas com os mesmos valores e novos tipos de dados. Meus tipos de dados originais quando o DataFrame foi criado foram: –

 root |-- id: integer (nullable = true) |-- flag1: string (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag3: string (nullable = true) 

Depois disso, corri o seguinte código para alterar o tipo de dados: –

 df=df.withColumnRenamed(,) // This was done for both flag1 and flag3 df=df.withColumn(,df.col().cast()).drop() 

Depois disso, meu resultado acabou sendo:

 root |-- id: integer (nullable = true) |-- flag2: string (nullable = true) |-- name: string (nullable = true) |-- flag1: boolean (nullable = true) |-- flag3: boolean (nullable = true) 

Você pode usar o código abaixo.

 df.withColumn("year", df("year").cast(IntegerType)) 

Que irá converter a coluna do ano para a coluna IntegerType .

  val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd //Schema to be applied to the table val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType) val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates() 

Pode-se alterar o tipo de dados de uma coluna usando cast in spark sql. nome da tabela é a tabela e tem duas colunas apenas coluna1 e coluna2 e coluna1 o tipo de dados deve ser alterado. ex-spark.sql (“selecione cast (coluna1 como Double) column1NewName, column2 da tabela”) No lugar de double escreva seu tipo de dados.

Outra maneira:

 // Generate a simple dataset containing five values and convert int to string type val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value") 

Gere um dataset simples contendo cinco valores e converta int em tipo string :

 val df = spark.range(5).select( col("id").cast("string") )