Como dividir um dataframe em frameworks de dados com os mesmos valores de coluna?

Usando Scala, como posso dividir dataFrame em vários dataFrame (seja matriz ou coleção) com o mesmo valor de coluna. Por exemplo, eu quero dividir o seguinte DataFrame:

ID Rate State 1 24 AL 2 35 MN 3 46 FL 4 34 AL 5 78 MN 6 99 FL 

para:

dataset 1

 ID Rate State 1 24 AL 4 34 AL 

dataset 2

 ID Rate State 2 35 MN 5 78 MN 

dataset 3

 ID Rate State 3 46 FL 6 99 FL 

Você pode coletar valores de estado exclusivos e simplesmente mapear o array resultante:

 val states = df.select("State").distinct.collect.flatMap(_.toSeq) val byStateArray = states.map(state => df.where($"State" <=> state)) 

ou mapear:

 val byStateMap = states .map(state => (state -> df.where($"State" <=> state))) .toMap 

A mesma coisa em Python:

 from itertools import chain from pyspark.sql.functions import col states = chain(*df.select("state").distinct().collect()) # PySpark 2.3 and later # In 2.2 and before col("state") == state) # should give the same outcome, ignoring NULLs # if NULLs are important # (lit(state).isNull() & col("state").isNull()) | (col("state") == state) df_by_state = {state: df.where(col("state").eqNullSafe(state)) for state in states} 

O problema óbvio aqui é que requer uma varredura de dados completa para cada nível, portanto, é uma operação cara. Se você está procurando uma maneira de simplesmente dividir a saída, veja também Como dividir um RDD em dois ou mais RDDs?

Em particular, você pode escrever o Dataset particionado pela coluna de interesse:

 val path: String = ??? df.write.partitionBy("State").parquet(path) 

e leia de volta, se necessário:

 // Depend on partition prunning for { state <- states } yield spark.read.parquet(path).where($"State" === state) // or explicitly read the partition for { state <- states } yield spark.read.parquet(s"$path/State=$state") 

Dependendo do tamanho dos dados, o número de níveis do nível de divisão, armazenamento e persistência da input pode ser mais rápido ou mais lento do que vários filtros.

É muito simples (se a versão da faísca for 2) se você fizer o dataframe como uma tabela temporária.

 df1.createOrReplaceTempView("df1") 

E agora você pode fazer as consultas

 var df2 = spark.sql("select * from df1 where state = 'FL'") var df3 = spark.sql("select * from df1 where state = 'MN'") var df4 = spark.sql("select * from df1 where state = 'AL'") 

Agora você tem o df2, df3, df4. Se você quiser tê-los como lista, você pode usar,

 df2.collect() df3.collect() 

ou até mesmo mapear / filtrar a function. Consulte https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes

Cinza