O que está acontecendo de errado com `unionAll` do Spark` DataFrame`?

Usando o Spark 1.5.0 e fornecido o código a seguir, espero unionAll para união DataFrame s com base no nome da coluna. No código, estou usando algum FunSuite para passar no SparkContext sc :

 object Entities { case class A (a: Int, b: Int) case class B (b: Int, a: Int) val as = Seq( A(1,3), A(2,4) ) val bs = Seq( B(5,3), B(6,4) ) } class UnsortedTestSuite extends SparkFunSuite { configuredUnitTest("The truth test.") { sc => val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val aDF = sc.parallelize(Entities.as, 4).toDF val bDF = sc.parallelize(Entities.bs, 4).toDF aDF.show() bDF.show() aDF.unionAll(bDF).show } } 

Saída:

 +---+---+ | a| b| +---+---+ | 1| 3| | 2| 4| +---+---+ +---+---+ | b| a| +---+---+ | 5| 3| | 6| 4| +---+---+ +---+---+ | a| b| +---+---+ | 1| 3| | 2| 4| | 5| 3| | 6| 4| +---+---+ 

Por que o resultado contém colunas “b” e “a” intermisturadas , em vez de alinhar as bases de colunas nos nomes das colunas? Soa como um bug sério !

Não parece um bug. O que você vê é um comportamento SQL padrão e todos os principais RDMBS, incluindo PostgreSQL , MySQL , Oracle e MS SQL, se comportam exatamente da mesma maneira. Você encontrará exemplos do SQL Fiddle vinculados a nomes.

Para citar o manual do PostgreSQL :

Para calcular a união, interseção ou diferença de duas consultas, as duas consultas devem ser “compatíveis com união”, o que significa que elas retornam o mesmo número de colunas e as colunas correspondentes têm tipos de dados compatíveis.

Nomes de colunas, excluindo a primeira tabela na operação de conjunto, são simplesmente ignorados.

Esse comportamento vem diretamente da Álgebra Relacional, onde o bloco de construção básico é uma tupla. Como as tuplas são ordenadas, uma união de dois conjuntos de tuplas é equivalente (ignorando a manipulação de duplicatas) à saída que você obtém aqui.

Se você quiser combinar usando nomes, você pode fazer algo parecido com isto

 import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.col def unionByName(a: DataFrame, b: DataFrame): DataFrame = { val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq a.select(columns: _*).unionAll(b.select(columns: _*)) } 

Para verificar ambos os nomes e tipos, deve ser suficiente replace as columns por:

 a.dtypes.toSet.intersect(b.dtypes.toSet).map{case (c, _) => col(c)}.toSeq 

Esse problema está sendo corrigido na spark2.3. Eles estão adicionando suporte a unionByName no dataset.

 https://issues.apache.org/jira/browse/SPARK-21043 

Conforme discutido no SPARK-9813 , parece que, contanto que os tipos de dados e o número de colunas sejam os mesmos entre os frameworks, a operação unionAll funcionará. Por favor, veja os comentários para discussão adicional.