Igualdade de class de caso no Apache Spark

Por que a correspondência de padrões no Spark não funciona da mesma maneira que no Scala? Veja o exemplo abaixo … A function f() tenta combinar com o padrão na class, o que funciona no Scala REPL, mas falha no Spark e resulta em todos os “???”. f2() é uma solução alternativa que obtém o resultado desejado no Spark usando .isInstanceOf() , mas eu entendo que isso seja ruim no Scala.

Qualquer ajuda sobre padrão correspondente à maneira correta neste cenário no Spark seria muito apreciada.

 abstract class a extends Serializable {val a: Int} case class b(a: Int) extends a case class bNull(a: Int=0) extends a val x: List[a] = List(b(0), b(1), bNull()) val xRdd = sc.parallelize(x) 

tentativa de correspondência de padrões que funciona no Scala REPL, mas falha no Spark

 def f(x: a) = x match { case b(n) => "b" case bNull(n) => "bnull" case _ => "???" } 

solução alternativa que funciona no Spark, mas é uma forma ruim (eu acho)

 def f2(x: a) = { if (x.isInstanceOf[b]) { "b" } else if (x.isInstanceOf[bNull]) { "bnull" } else { "???" } } 

Ver resultados

 xRdd.map(f).collect //does not work in Spark // result: Array("???", "???", "???") xRdd.map(f2).collect // works in Spark // resut: Array("b", "b", "bnull") x.map(f(_)) // works in Scala REPL // result: List("b", "b", "bnull") 

Versões usadas … Os resultados do Spark são executados em spark-shell (Spark 1.6 no AWS EMR-4.3) Scala REPL no SBT 0.13.9 (Scala 2.10.5)

Este é um problema conhecido com o Spark REPL. Você pode encontrar mais detalhes no SPARK-2620 . Ela afeta várias operações no Spark REPL, incluindo a maioria das transformações nos PairwiseRDDs . Por exemplo:

 case class Foo(x: Int) val foos = Seq(Foo(1), Foo(1), Foo(2), Foo(2)) foos.distinct.size // Int = 2 val foosRdd = sc.parallelize(foos, 4) foosRdd.distinct.count // Long = 4 foosRdd.map((_, 1)).reduceByKey(_ + _).collect // Array[(Foo, Int)] = Array((Foo(1),1), (Foo(1),1), (Foo(2),1), (Foo(2),1)) foosRdd.first == foos.head // Boolean = false Foo.unapply(foosRdd.first) == Foo.unapply(foos.head) // Boolean = true 

O que torna ainda pior é que os resultados dependem da distribuição de dados:

 sc.parallelize(foos, 1).distinct.count // Long = 2 sc.parallelize(foos, 1).map((_, 1)).reduceByKey(_ + _).collect // Array[(Foo, Int)] = Array((Foo(2),2), (Foo(1),2)) 

A coisa mais simples que você pode fazer é definir e empacotar as classs de casos requeridas fora do REPL. Qualquer código enviado diretamente usando o spark-submit deve funcionar.

No Scala 2.11+ você pode criar um pacote diretamente no REPL com paste -raw .

 scala> :paste -raw // Entering paste mode (ctrl-D to finish) package bar case class Bar(x: Int) // Exiting paste mode, now interpreting. scala> import bar.Bar import bar.Bar scala> sc.parallelize(Seq(Bar(1), Bar(1), Bar(2), Bar(2))).distinct.collect res1: Array[bar.Bar] = Array(Bar(1), Bar(2))