Como compactar dois (ou mais) DataFrame no Spark

Eu tenho dois DataFrame a e b . a é como

 Column 1 | Column 2 abc | 123 cde | 23 

b é como

 Column 1 1 2 

Eu quero zip b (ou ainda mais) DataFrames que se torna algo como:

 Column 1 | Column 2 | Column 3 abc | 123 | 1 cde | 23 | 2 

Como eu posso fazer isso?

Operação como essa não é suportada por uma API do DataFrame. É possível zipar dois RDDs, mas, para fazê-lo funcionar, você precisa corresponder ao número de partições e ao número de elementos por partição. Assumindo que este é o caso:

 import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructField, StructType, LongType} val a: DataFrame = sc.parallelize(Seq( ("abc", 123), ("cde", 23))).toDF("column_1", "column_2") val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3") // Merge rows val rows = a.rdd.zip(b.rdd).map{ case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)} // Merge schemas val schema = StructType(a.schema.fields ++ b.schema.fields) // Create new data frame val ab: DataFrame = sqlContext.createDataFrame(rows, schema) 

Se as condições acima não forem cumpridas, a única opção que vem à mente é adicionar um índice e ingressar:

 def addIndex(df: DataFrame) = sqlContext.createDataFrame( // Add index df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)}, // Create schema StructType(df.schema.fields :+ StructField("_index", LongType, false)) ) // Add indices val aWithIndex = addIndex(a) val bWithIndex = addIndex(b) // Join and clean val ab = aWithIndex .join(bWithIndex, Seq("_index")) .drop("_index") 

Na implementação do Scala de Dataframes, não há uma maneira simples de concatenar dois dataframes em um. Podemos simplesmente contornar essa limitação, adicionando índices a cada linha dos frameworks de dados. Então, podemos fazer uma junit interna por esses índices. Este é o meu código stub desta implementação:

 val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2") val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId) val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3") val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId) aWithId.join(bWithId, "id") 

Um pouco de luz – confira como o Python faz isso!

E quanto ao SQL puro?

 SELECT room_name, sender_nickname, message_id, row_number() over (partition by room_name order by message_id) as message_index, row_number() over (partition by room_name, sender_nickname order by message_id) as user_message_index from messages order by room_name, message_id 

Eu sei que o OP estava usando o Scala, mas se, como eu, você precisa saber como fazer isso no pyspark, então tente o código Python abaixo. Assim como a primeira solução do @n0323, ela depende de RDD.zip() e, portanto, falhará se os dois DataFrames não tiverem o mesmo número de partições e o mesmo número de linhas em cada partição.

 from pyspark.sql import Row from pyspark.sql.types import StructType def zipDataFrames(left, right): CombinedRow = Row(*left.columns + right.columns) def flattenRow(row): left = row[0] right = row[1] combinedVals = [left[col] for col in left.__fields__] + [right[col] for col in right.__fields__] return CombinedRow(*combinedVals) zippedRdd = left.rdd.zip(right.rdd).map(lambda row: flattenRow(row)) combinedSchema = StructType(left.schema.fields + right.schema.fields) return zippedRdd.toDF(combinedSchema) joined = zipDataFrames(a, b)