Conjunto de dados do Spark 2.0 vs DataFrame

começando com faísca 2.0.1 Eu tenho algumas perguntas. Eu li muita documentação, mas até agora não consegui encontrar respostas suficientes:

  • Qual é a diferença entre
    • df.select("foo")
    • df.select($"foo")
  • eu entendi corretamente que
    • myDataSet.map(foo.someVal) é typesafe e não será convertido em RDD mas permanece na representação DataSet / sem sobrecarga adicional (desempenho sábio para 2.0.0)
  • todos os outros comandos, por exemplo, select, .. são apenas açúcar sintático. Eles não são seguros e um mapa pode ser usado no lugar. Como eu poderia df.select("foo") type-safe sem uma declaração de mapa?
    • Por que devo usar um UDF / UADF em vez de um mapa (supondo que o mapa permaneça na representação do dataset)?

   
  1. Diferença entre df.select("foo") e df.select($"foo") é assinatura. O primeiro leva pelo menos um String , o último um zero ou mais Columns . Não há diferença prática além disso.
  2. myDataSet.map(foo.someVal) tipo verifica, mas como qualquer operação de Dataset usa RDD de objects, e comparado a operações DataFrame , há uma sobrecarga significativa. Vamos dar uma olhada em um exemplo simples:

     case class FooBar(foo: Int, bar: String) val ds = Seq(FooBar(1, "x")).toDS ds.map(_.foo).explain 
     == Physical Plan == *SerializeFromObject [input[0, int, true] AS value#123] +- *MapElements , obj#122: int +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar +- LocalTableScan [foo#117, bar#118] 

    Como você pode ver, este plano de execução requer access a todos os campos e tem que DeserializeToObject .

  3. Não. Em geral, outros methods não são açúcar sintático e geram um plano de execução significativamente diferente. Por exemplo:

     ds.select($"foo").explain 
     == Physical Plan == LocalTableScan [foo#117] 

    Comparado com o plano mostrado antes que ele possa acessar a coluna diretamente. Não é tanto uma limitação da API, mas um resultado de uma diferença na semântica operacional.

  4. Como eu poderia df.select (“foo”) type-safe sem uma declaração de mapa?

    Não existe tal opção. Enquanto as colunas digitadas permitem que você transforme estaticamente o Dataset em outro Dataset tipado estaticamente:

     ds.select($"bar".as[Int]) 

    não há tipo seguro. Há outras tentativas de include operações otimizadas seguras de tipo, como agregações digitadas , mas essa API experimental.

  5. Por que devo usar um UDF / UADF em vez de um mapa

    É completamente com você. Cada estrutura de dados distribuída no Spark fornece suas próprias vantagens e desvantagens (consulte, por exemplo, o Spark UDAF com ArrayType como problemas de desempenho do bufferSchema ).

Pessoalmente, acho Dataset tipado para ser o menos útil:

  • Não forneça o mesmo intervalo de otimizações que o Dataset[Row] (embora eles compartilhem o formato de armazenamento e algumas otimizações de planos de execução não se beneficiem totalmente da geração de código ou armazenamento fora do heap) nem access a todos os resources analíticos do DataFrame .

  • Transformações digitadas são checkboxs pretas e criam uma barreira de análise para o otimizador. Por exemplo, seleções (filtros) não podem ser transferidas por meio de transformação digitada:

     ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain 
     == Physical Plan == *Filter (foo#133 = 1) +- *Filter .apply +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- LocalTableScan [foo#133, bar#134] 

    Comparado com:

     ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain 
     == Physical Plan == *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))]) +- Exchange hashpartitioning(foo#133, 200) +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))]) +- *Filter (foo#133 = 1) +- LocalTableScan [foo#133, bar#134] 

    Isso afeta resources como pushdown de predicados ou pushdown de projeção.

  • Não são tão flexíveis quanto os RDDs com apenas um pequeno subconjunto de tipos suportados nativamente.

  • A “segurança de tipo” com Encoders é disputável quando o Dataset é convertido usando as método. Como a forma de dados não é codificada usando uma assinatura, um compilador só pode verificar a existência de um Encoder .

Perguntas relacionadas:

  • Executar uma junit digitada no Scala com conjuntos de dados do Spark
  • Spark 2.0 DataSets groupByKey e divisão de operação e tipo de segurança

O Dataset Spark é muito mais poderoso que o Dataframe Spark. Exemplo pequeno – você só pode criar Dataframe of Row , Tuple ou qualquer tipo de dados primitivo, mas o Dataset lhe dá o poder de criar Dataset de qualquer tipo não primitivo. ou seja, você pode literalmente criar um Dataset de Dataset do tipo de object.

Ex:

 case class Employee(id:Int,name:String) Dataset[Employee] // is valid Dataframe[Employee] // is invalid