Como converter object rdd para dataframe em faísca

Como posso converter um RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] ) em um Dataframe org.apache.spark.sql.DataFrame . Eu converti um dataframe para o rdd usando .rdd . Depois de processá-lo, eu o quero de volta no dataframe. Como posso fazer isso ?

SqlContext tem um número de methods createDataFrame que criam um DataFrame dado um RDD . Eu imagino que um desses funcionará para o seu contexto.

Por exemplo:

 def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame 

Cria um DataFrame de um RDD contendo Rows usando o esquema fornecido.

Assumindo que seu RDD [row] é chamado de rdd, você pode usar:

 val sqlContext = new SQLContext(sc) import sqlContext.implicits._ rdd.toDF() 

Este código funciona perfeitamente no Spark 2.x com Scala 2.11

Importar classs necessárias

 import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} 

Crie o object SparkSession , aqui está a spark

 val spark: SparkSession = SparkSession.builder.master("local").getOrCreate val sc = spark.sparkContext // Just used to create test RDDs 

Vamos um RDD para torná-lo DataFrame

 val rdd = sc.parallelize( Seq( ("first", Array(2.0, 1.0, 2.1, 5.4)), ("test", Array(1.5, 0.5, 0.9, 3.7)), ("choose", Array(8.0, 2.9, 9.1, 2.5)) ) ) 

Método 1

Usando SparkSession.createDataFrame(RDD obj) .

 val dfWithoutSchema = spark.createDataFrame(rdd) dfWithoutSchema.show() +------+--------------------+ | _1| _2| +------+--------------------+ | first|[2.0, 1.0, 2.1, 5.4]| | test|[1.5, 0.5, 0.9, 3.7]| |choose|[8.0, 2.9, 9.1, 2.5]| +------+--------------------+ 

Método 2

Usando SparkSession.createDataFrame(RDD obj) e especificando nomes de coluna.

 val dfWithSchema = spark.createDataFrame(rdd).toDF("id", "vals") dfWithSchema.show() +------+--------------------+ | id| vals| +------+--------------------+ | first|[2.0, 1.0, 2.1, 5.4]| | test|[1.5, 0.5, 0.9, 3.7]| |choose|[8.0, 2.9, 9.1, 2.5]| +------+--------------------+ 

Método 3 (resposta real à pergunta)

Dessa forma, a input rdd deve ser do tipo RDD[Row] .

 val rowsRdd: RDD[Row] = sc.parallelize( Seq( Row("first", 2.0, 7.0), Row("second", 3.5, 2.5), Row("third", 7.0, 5.9) ) ) 

crie o esquema

 val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val1", DoubleType, true)) .add(StructField("val2", DoubleType, true)) 

Agora aplique o rowsRdd e o schema ao createDataFrame()

 val df = spark.createDataFrame(rowsRdd, schema) df.show() +------+----+----+ | id|val1|val2| +------+----+----+ | first| 2.0| 7.0| |second| 3.5| 2.5| | third| 7.0| 5.9| +------+----+----+ 

Suponha que você tenha um DataFrame e deseje fazer alguma modificação nos dados dos campos, convertendo-os em RDD[Row] .

 val aRdd = aDF.map(x=>Row(x.getAs[Long]("id"),x.getAs[List[String]]("role").head)) 

Para converter de volta para o DataFrame partir do RDD , precisamos definir o tipo de estrutura do RDD .

Se o tipo de dados for Long , ele se tornará como LongType na estrutura.

Se String seguida, StringType na estrutura.

 val aStruct = new StructType(Array(StructField("id",LongType,nullable = true),StructField("role",StringType,nullable = true))) 

Agora você pode converter o RDD em DataFrame usando o método createDataFrame .

 val aNamedDF = sqlContext.createDataFrame(aRdd,aStruct) 

Nota: Esta resposta foi originalmente publicada aqui

Estou postando esta resposta porque gostaria de compartilhar detalhes adicionais sobre as opções disponíveis que não encontrei nas outras respostas


Para criar um DataFrame a partir de um RDD de linhas, existem duas opções principais:

1) Como já foi toDF() , você poderia usar toDF() que pode ser importado através da import sqlContext.implicits._ . No entanto, essa abordagem só funciona para os seguintes tipos de RDDs:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

(fonte: Scaladoc do object SQLContext.implicits )

A última assinatura, na verdade, significa que ela pode funcionar para um RDD de tuplas ou um RDD de classs de casos (porque tuplas e classs de casos são subclasss de scala.Product ).

Então, para usar essa abordagem para um RDD[Row] , você precisa mapeá-lo para um RDD[T <: scala.Product] . Isso pode ser feito mapeando cada linha para uma class de caso customizada ou para uma tupla, como nos snippets de código a seguir:

 val df = rdd.map({ case Row(val1: String, ..., valN: Long) => (val1, ..., valN) }).toDF("col1_name", ..., "colN_name") 

ou

 case class MyClass(val1: String, ..., valN: Long = 0L) val df = rdd.map({ case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN) }).toDF("col1_name", ..., "colN_name") 

A principal desvantagem desta abordagem (na minha opinião) é que você deve definir explicitamente o esquema do DataFrame resultante na function map, coluna por coluna. Talvez isso possa ser feito de forma programática, se você não conhecer o esquema com antecedência, mas as coisas podem ficar um pouco confusas lá. Então, alternativamente, existe outra opção:


2) Você pode usar createDataFrame(rowRDD: RDD[Row], schema: StructType) como na resposta aceita, que está disponível no object SQLContext . Exemplo para converter um RDD de um DataFrame antigo:

 val rdd = oldDF.rdd val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema) 

Observe que não há necessidade de definir explicitamente qualquer coluna de esquema. Nós reutilizamos o esquema do antigo DF, que é da class StructType e pode ser facilmente estendido. No entanto, essa abordagem às vezes não é possível e, em alguns casos, pode ser menos eficiente que a primeira.

Aqui está um exemplo simples de converter sua lista em Spark RDD e, em seguida, converter esse Spark RDD em Dataframe.

Por favor, note que eu usei scala REPL do Spark-shell para executar o código a seguir, Aqui sc é uma instância do SparkContext que está implicitamente disponível no Spark-shell. Espero que responda sua pergunta.

 scala> val numList = List(1,2,3,4,5) numList: List[Int] = List(1, 2, 3, 4, 5) scala> val numRDD = sc.parallelize(numList) numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[80] at parallelize at :28 scala> val numDF = numRDD.toDF numDF: org.apache.spark.sql.DataFrame = [_1: int] scala> numDF.show +---+ | _1| +---+ | 1| | 2| | 3| | 4| | 5| +---+ 

Método 1: (Scala)

 val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val df_2 = sc.parallelize(Seq((1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c"))).toDF("x", "y", "z") 

Método 2: (Scala)

 case class temp(val1: String,val3 : Double) val rdd = sc.parallelize(Seq( Row("foo", 0.5), Row("bar", 0.0) )) val rows = rdd.map({case Row(val1:String,val3:Double) => temp(val1,val3)}).toDF() rows.show() 

Método 1: (Python)

 from pyspark.sql import Row l = [('Alice',2)] Person = Row('name','age') rdd = sc.parallelize(l) person = rdd.map(lambda r:Person(*r)) df2 = sqlContext.createDataFrame(person) df2.show() 

Método 2: (Python)

 from pyspark.sql.types import * l = [('Alice',2)] rdd = sc.parallelize(l) schema = StructType([StructField ("name" , StringType(), True) , StructField("age" , IntegerType(), True)]) df3 = sqlContext.createDataFrame(rdd, schema) df3.show() 

Extraiu o valor do object de linha e, em seguida, aplicou a class case para converter rdd para DF

 val temp1 = attrib1.map{case Row ( key: Int ) => s"$key" } val temp2 = attrib2.map{case Row ( key: Int) => s"$key" } case class RLT (id: String, attrib_1 : String, attrib_2 : String) import hiveContext.implicits._ val df = result.map{ s => RLT(s(0),s(1),s(2)) }.toDF 

Em versões mais recentes da faísca (2.0+). Isso também funcionará mesmo sem um sqlcontext disponível.

 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ val spark = SparkSession .builder() .getOrCreate() import spark.implicits._ val dfSchema = Seq("col1", "col2", "col3") rdd.toDF(dfSchema: _*) 
 One needs to create a schema, and attach it to the Rdd. 

Assumindo val spark é um produto de um SparkSession.builder …

  import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ /* Lets gin up some sample data: * As RDD's and dataframes can have columns of differing types, lets make our * sample data a three wide, two tall, rectangle of mixed types. * A column of Strings, a column of Longs, and a column of Doubules */ val arrayOfArrayOfAnys = Array.ofDim[Any](2,3) arrayOfArrayOfAnys(0)(0)="aString" arrayOfArrayOfAnys(0)(1)=0L arrayOfArrayOfAnys(0)(2)=3.14159 arrayOfArrayOfAnys(1)(0)="bString" arrayOfArrayOfAnys(1)(1)=9876543210L arrayOfArrayOfAnys(1)(2)=2.71828 /* The way to convert an anything which looks rectangular, * (Array[Array[String]] or Array[Array[Any]] or Array[Row], ... ) into an RDD is to * throw it into sparkContext.parallelize. * http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext shows * the parallelize definition as * def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism) * so in our case our ArrayOfArrayOfAnys is treated as a sequence of ArraysOfAnys. * Will leave the numSlices as the defaultParallelism, as I have no particular cause to change it. */ val rddOfArrayOfArrayOfAnys=spark.sparkContext.parallelize(arrayOfArrayOfAnys) /* We'll be using the sqlContext.createDataFrame to add a schema our RDD. * The RDD which goes into createDataFrame is an RDD[Row] which is not what we happen to have. * To convert anything one tall and several wide into a Row, one can use Row.fromSeq(thatThing.toSeq) * As we have an RDD[somethingWeDontWant], we can map each of the RDD rows into the desired Row type. */ val rddOfRows=rddOfArrayOfArrayOfAnys.map(f=> Row.fromSeq(f.toSeq) ) /* Now to construct our schema. This needs to be a StructType of 1 StructField per column in our dataframe. * https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructField shows the definition as * case class StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty) * Will leave the two default values in place for each of the columns: * nullability as true, * metadata as an empty Map[String,Any] * */ val schema = StructType( StructField("colOfStrings", StringType) :: StructField("colOfLongs" , LongType ) :: StructField("colOfDoubles", DoubleType) :: Nil ) val df=spark.sqlContext.createDataFrame(rddOfRows,schema) /* * +------------+----------+------------+ * |colOfStrings|colOfLongs|colOfDoubles| * +------------+----------+------------+ * | aString| 0| 3.14159| * | bString|9876543210| 2.71828| * +------------+----------+------------+ */ df.show 

Mesmos passos, mas com menos declarações val:

  val arrayOfArrayOfAnys=Array( Array("aString",0L ,3.14159), Array("bString",9876543210L,2.71828) ) val rddOfRows=spark.sparkContext.parallelize(arrayOfArrayOfAnys).map(f=>Row.fromSeq(f.toSeq)) /* If one knows the datatypes, for instance from JDBC queries as to RDBC column metadata: * Consider constructing the schema from an Array[StructField]. This would allow looping over * the columns, with a match statement applying the appropriate sql datatypes as the second * StructField arguments. */ val sf=new Array[StructField](3) sf(0)=StructField("colOfStrings",StringType) sf(1)=StructField("colOfLongs" ,LongType ) sf(2)=StructField("colOfDoubles",DoubleType) val df=spark.sqlContext.createDataFrame(rddOfRows,StructType(sf.toList)) df.show 

Para converter uma matriz [linha] para DataFrame ou dataset, o seguinte funciona elegantemente:

Digamos, o esquema é o StructType para a linha, então

 val rows: Array[Row]=... implicit val encoder = RowEncoder.apply(schema) import spark.implicits._ rows.toDS