Como armazenar objects personalizados no DataSet?

De acordo com a introdução dos conjuntos de dados do Spark :

À medida que aguardamos com expectativa o Spark 2.0, planejamos algumas melhorias interessantes nos conjuntos de dados, especificamente: … Encoders personalizados – embora atualmente geremos automaticamente encoders para uma grande variedade de tipos, gostaríamos de abrir uma API para objects personalizados.

e tenta armazenar o tipo personalizado em um lead do Dataset para o seguinte erro, como:

Não é possível encontrar o codificador para o tipo armazenado em um dataset. Tipos primitivos (Int, String, etc) e Tipos de produtos (classs de casos) são suportados importando sqlContext.implicits._ O suporte para serializar outros tipos será adicionado em versões futuras

ou:

Java.lang.UnsupportedOperationException: Nenhum codificador encontrado para ….

Existem soluções alternativas?


Observe que essa questão existe apenas como um ponto de input para uma resposta do Wiki da Comunidade. Sinta-se à vontade para atualizar / melhorar tanto a pergunta quanto a resposta.

Atualizar

Esta resposta ainda é válida e informativa, embora as coisas agora sejam melhores desde a versão 2.2 / 2.3, que adiciona suporte embutido ao codificador para Set , Seq , Map , Date , Timestamp e BigDecimal . Se você mantiver os tipos com apenas as classs de casos e os tipos usuais de Scala, você deve estar bem apenas com o implícito em SQLImplicits .


Infelizmente, praticamente nada foi adicionado para ajudar com isso. Procurar por @since 2.0.0 em Encoders.scala ou SQLImplicits.scala encontra principalmente coisas relacionadas a tipos primitivos (e alguns ajustes de classs de casos). Então, a primeira coisa a dizer: atualmente, não há um suporte realmente bom para codificadores de class personalizados . Com isso fora do caminho, o que se segue são alguns truques que fazem um trabalho tão bom quanto podemos esperar, dado o que temos atualmente à nossa disposição. Como um aviso prévio: isso não funcionará perfeitamente e eu farei o meu melhor para tornar todas as limitações claras e diretas.

Qual é exatamente o problema

Quando você deseja criar um dataset, o Spark “requer um codificador (para converter um object JVM do tipo T para e da representação interna do Spark SQL) que geralmente é criado automaticamente por meio de implícitos de um SparkSession ou pode ser criado explicitamente chamando methods em Encoders “(retirado dos documentos em createDataset ). Um codificador terá o formato Encoder[T] onde T é o tipo que você está codificando. A primeira sugestão é adicionar import spark.implicits._ (que fornece esses encoders implícitos) e a segunda sugestão é passar explicitamente no encoder implícito usando esse conjunto de funções relacionadas ao encoder.

Não há codificador disponível para as classs regulares, portanto

 import spark.implicits._ class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) 

lhe dará o seguinte erro de tempo de compilation implícito relacionado:

Não é possível encontrar o codificador para o tipo armazenado em um dataset. Tipos primitivos (Int, String, etc) e Tipos de produtos (classs de casos) são suportados importando sqlContext.implicits._ O suporte para serializar outros tipos será adicionado em versões futuras

No entanto, se você include qualquer tipo que você tenha usado para obter o erro acima em alguma class que estenda o Product , o erro confusamente será adiado para o tempo de execução.

 import spark.implicits._ case class Wrap[T](unwrap: T) class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3)))) 

Compila muito bem, mas falha em tempo de execução com

java.lang.UnsupportedOperationException: Nenhum codificador encontrado para MyObj

A razão para isso é que os codificadores que o Spark cria com os implícitos são, na verdade, feitos apenas em tempo de execução (via releitura de scala). Nesse caso, todas as verificações do Spark em tempo de compilation são que a class mais externa estende o Product (que todas as classs de caso) e somente percebe no tempo de execução que ainda não sabe o que fazer com MyObj (o mesmo problema ocorre se eu tentar para fazer um Dataset[(Int,MyObj)] – O Spark aguarda até o tempo de execução para barf em MyObj ). Estes são problemas centrais que estão em extrema necessidade de serem corrigidos:

  • algumas classs que estendem a compilation do Product apesar de sempre falharem em tempo de execução e
  • não há como transmitir encoders personalizados para tipos nesteds (não tenho como alimentar um codificador Spark para apenas MyObj modo que ele saiba como codificar Wrap[MyObj] ou (Int,MyObj) ).

É só usar kryo

A solução que todos sugerem é usar o codificador kryo .

 import spark.implicits._ class MyObj(val i: Int) implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) 

Isso fica muito tedioso rápido embora. Especialmente se o seu código está manipulando todos os tipos de conjuntos de dados, junit, agrupamento, etc. Você acaba acumulando um monte de implícitos extras. Então, por que não apenas fazer um implícito que faz tudo isso automaticamente?

 import scala.reflect.ClassTag implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct) 

E agora, parece que eu posso fazer quase tudo o que eu quiser (o exemplo abaixo não funciona na spark-shell onde spark.implicits._ é automaticamente importado)

 class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and .. val d3 = d1.map(d => (di, d)).alias("d3") // .. deals with the new type val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom! 

Ou quase. O problema é que o uso do kryo leva o Spark a armazenar cada linha do dataset como um object binário simples. Para map , filter , foreach isso é suficiente, mas para operações como join , o Spark realmente precisa que elas sejam separadas em colunas. Inspecionando o esquema para d2 ou d3 , você vê que há apenas uma coluna binária:

 d2.printSchema // root // |-- value: binary (nullable = true) 

Solução parcial para tuplas

Então, usando a mágica dos implícitos no Scala (mais em 6.26.3 Sobrecarregando a Resolução ), eu posso fazer uma série de implícitos que farão um trabalho tão bom quanto possível, pelo menos para as tuplas, e funcionará bem com os implícitos existentes:

 import org.apache.spark.sql.{Encoder,Encoders} import scala.reflect.ClassTag import spark.implicits._ // we can still take advantage of all the old implicits implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c) implicit def tuple2[A1, A2]( implicit e1: Encoder[A1], e2: Encoder[A2] ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) implicit def tuple3[A1, A2, A3]( implicit e1: Encoder[A1], e2: Encoder[A2], e3: Encoder[A3] ): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3) // ... you can keep making these 

Então, armado com estes implícitos, eu posso fazer o meu exemplo acima do trabalho, embora com alguma renomeação de coluna

 class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2") val d3 = d1.map(d => (di ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3") val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") 

Ainda não descobri como obter os nomes de tupla esperados ( _1 , _2 , …) por padrão sem renomeá-los – se alguém quiser brincar com isso, é aí que o nome "value" é introduzido e é aqui que os nomes das tuplas são geralmente adicionados. No entanto, o ponto chave é que agora tenho um esquema estruturado legal:

 d4.printSchema // root // |-- _1: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) // |-- _2: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) 

Então, em resumo, esta solução alternativa:

  • nos permite obter colunas separadas para as tuplas (assim podemos nos juntar nas tuplas novamente, yay!)
  • podemos novamente confiar nos implícitos (portanto, não há necessidade de passar em kryo por todo o lugar)
  • é quase totalmente compatível com a import spark.implicits._ (com alguns renomeando envolvidos)
  • não nos permite juntar-nos nas colunas binárias serializadas com kyro , quanto mais em campos que possam ter
  • tem o efeito colateral desagradável de renomear algumas das colunas da tupla para “valor” (se necessário, isso pode ser desfeito convertendo .toDF , especificando novos nomes de coluna e convertendo de volta para um dataset – e os nomes de esquema parecem ser preservados através de joins, onde são mais necessários).

Solução parcial para aulas em geral

Este é menos agradável e não tem boa solução. No entanto, agora que temos a solução de tupla acima, eu tenho um palpite de que a solução de conversão implícita de outra resposta será um pouco menos dolorosa também, já que você pode converter suas classs mais complexas em tuplas. Então, depois de criar o dataset, você provavelmente renomeia as colunas usando a abordagem de dataframe. Se tudo correr bem, isso é realmente uma melhoria, já que agora posso realizar junções nos campos das minhas aulas. Se eu tivesse acabado de usar um serializador kryo binário plano que não teria sido possível.

Aqui está um exemplo que faz um pouco de tudo: Eu tenho uma class MyObj que possui campos dos tipos Int , java.util.UUID e Set[String] . O primeiro cuida de si mesmo. O segundo, embora eu pudesse serializar usando kryo seria mais útil se fosse armazenado como uma String (já que UUID são geralmente algo que eu quero juntar). O terceiro realmente pertence apenas a uma coluna binária.

 class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String]) // alias for the type to convert to and from type MyObjEncoded = (Int, String, Set[String]) // implicit conversions implicit def toEncoded(o: MyObj): MyObjEncoded = (oi, outoString, os) implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3) 

Agora, posso criar um dataset com um esquema interessante usando esse mecanismo:

 val d = spark.createDataset(Seq[MyObjEncoded]( new MyObj(1, java.util.UUID.randomUUID, Set("foo")), new MyObj(2, java.util.UUID.randomUUID, Set("bar")) )).toDF("i","u","s").as[MyObjEncoded] 

E o esquema mostra-me colunas com os nomes certos e com as duas primeiras duas coisas que posso juntar contra.

 d.printSchema // root // |-- i: integer (nullable = false) // |-- u: string (nullable = true) // |-- s: binary (nullable = true) 
  1. Usando codificadores genéricos.

    Existem dois codificadores genéricos disponíveis por enquanto, kryo e javaSerialization onde o último é explicitamente descrito como:

    extremamente ineficiente e só deve ser usado como último recurso.

    Assumindo a seguinte class

     class Bar(i: Int) { override def toString = s"bar $i" def bar = i } 

    você pode usar esses codificadores adicionando o codificador implícito:

     object BarEncoders { implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = org.apache.spark.sql.Encoders.kryo[Bar] } 

    que pode ser usado em conjunto da seguinte forma:

     object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarEncoders._ val ds = Seq(new Bar(1)).toDS ds.show sc.stop() } } 

    Ele armazena objects como uma coluna binary , portanto, quando convertido para DataFrame você obtém o seguinte esquema:

     root |-- value: binary (nullable = true) 

    Também é possível codificar tuplas usando o codificador kryo para um campo específico:

     val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar]) spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder) // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary] 

    Por favor, note que nós não dependemos de codificadores implícitos aqui, mas passamos pelo codificador explicitamente, então isso provavelmente não funcionará com o método toDS .

  2. Usando conversões implícitas:

    Forneça conversões implícitas entre a representação que pode ser codificada e a class personalizada, por exemplo:

     object BarConversions { implicit def toInt(bar: Bar): Int = bar.bar implicit def toBar(i: Int): Bar = new Bar(i) } object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarConversions._ type EncodedBar = Int val bars: RDD[EncodedBar] = sc.parallelize(Seq(new Bar(1))) val barsDS = bars.toDS barsDS.show barsDS.map(_.bar).show sc.stop() } } 

Perguntas relacionadas:

  • Como criar um codificador para o construtor do tipo Option, por exemplo, Option [Int]?

Os codificadores funcionam mais ou menos da mesma forma no Spark2.0 . E Kryo ainda é a escolha de serialization recomendada.

Você pode olhar o exemplo a seguir com spark-shell

 scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> case class NormalPerson(name: String, age: Int) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class NormalPerson scala> case class ReversePerson(name: Int, age: String) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class ReversePerson scala> val normalPersons = Seq( | NormalPerson("Superman", 25), | NormalPerson("Spiderman", 17), | NormalPerson("Ironman", 29) | ) normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29)) scala> val ds1 = sc.parallelize(normalPersons).toDS ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int] scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds1.show() +---------+---+ | name|age| +---------+---+ | Superman| 25| |Spiderman| 17| | Ironman| 29| +---------+---+ scala> ds2.show() +----+---------+ |name| age| +----+---------+ | 25| Superman| | 17|Spiderman| | 29| Ironman| +----+---------+ scala> ds1.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Superman. I am 25 years old. I am Spiderman. I am 17 years old. scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds2.foreach(p => println(p.aboutMe)) I am 17. I am Spiderman years old. I am 25. I am Superman years old. I am 29. I am Ironman years old. 

Até agora] não havia appropriate encoders no escopo atual, de modo que nossas pessoas não foram codificadas como valores binary . Mas isso vai mudar quando fornecermos alguns codificadores implicit usando a serialização do Kryo .

 // Provide Encoders scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson] normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary] scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson] reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary] // Ecoders will be used since they are now present in Scope scala> val ds3 = sc.parallelize(normalPersons).toDS ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary] scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name)) ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary] // now all our persons show up as binary values scala> ds3.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ scala> ds4.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ // Our instances still work as expected scala> ds3.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Spiderman. I am 17 years old. I am Superman. I am 25 years old. scala> ds4.foreach(p => println(p.aboutMe)) I am 25. I am Superman years old. I am 29. I am Ironman years old. I am 17. I am Spiderman years old. 

No caso da class Java Bean, isso pode ser útil

 import spark.sqlContext.implicits._ import org.apache.spark.sql.Encoders implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass]) 

Agora você pode simplesmente ler o dataFrame como DataFrame personalizado

 dataFrame.as[MyClass] 

Isso criará um codificador de class personalizado e não um binário.

Você pode usar o UDTRegistration e, em seguida, Classes de casos, Tuplas, etc … todos funcionam corretamente com o seu tipo definido pelo usuário!

Digamos que você queira usar um Enum personalizado:

 trait CustomEnum { def value:String } case object Foo extends CustomEnum { val value = "F" } case object Bar extends CustomEnum { val value = "B" } object CustomEnum { def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get } 

Registre-se assim:

 // First define a UDT class for it: class CustomEnumUDT extends UserDefinedType[CustomEnum] { override def sqlType: DataType = org.apache.spark.sql.types.StringType override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value) // Note that this will be a UTF8String type override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString) override def userClass: Class[CustomEnum] = classOf[CustomEnum] } // Then Register the UDT Class! // NOTE: you have to put this file into the org.apache.spark package! UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName) 

Então USE IT!

 case class UsingCustomEnum(id:Int, en:CustomEnum) val seq = Seq( UsingCustomEnum(1, Foo), UsingCustomEnum(2, Bar), UsingCustomEnum(3, Foo) ).toDS() seq.filter(_.en == Foo).show() println(seq.collect()) 

Digamos que você queira usar um registro polimórfico:

 trait CustomPoly case class FooPoly(id:Int) extends CustomPoly case class BarPoly(value:String, secondValue:Long) extends CustomPoly 

… e o uso assim:

 case class UsingPoly(id:Int, poly:CustomPoly) Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1)) ).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false }).show() 

Você pode escrever um UDT personalizado que codifica tudo para bytes (estou usando a serialização java aqui, mas provavelmente é melhor instrumentar o contexto Kryo do Spark).

Primeiro defina a class UDT:

 class CustomPolyUDT extends UserDefinedType[CustomPoly] { val kryo = new Kryo() override def sqlType: DataType = org.apache.spark.sql.types.BinaryType override def serialize(obj: CustomPoly): Any = { val bos = new ByteArrayOutputStream() val oos = new ObjectOutputStream(bos) oos.writeObject(obj) bos.toByteArray } override def deserialize(datum: Any): CustomPoly = { val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]]) val ois = new ObjectInputStream(bis) val obj = ois.readObject() obj.asInstanceOf[CustomPoly] } override def userClass: Class[CustomPoly] = classOf[CustomPoly] } 

Então registre-o:

 // NOTE: The file you do this in has to be inside of the org.apache.spark package! UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName) 

Então você pode usá-lo!

 // As shown above: case class UsingPoly(id:Int, poly:CustomPoly) Seq( UsingPoly(1, new FooPoly(1)), UsingPoly(2, new BarPoly("Blah", 123)), UsingPoly(3, new FooPoly(1)) ).toDS polySeq.filter(_.poly match { case FooPoly(value) => value == 1 case _ => false }).show() 

Meus exemplos serão em Java, mas não imagino que seja difícil se adaptar ao Scala.

Eu tenho conseguido converter RDD para Dataset usando spark.createDataset e Encoders.bean contanto que Fruit seja um Java Bean simples.

Etapa 1: crie o Java Bean simples.

 public class Fruit implements Serializable { private String name = "default-fruit"; private String color = "default-color"; // AllArgsConstructor public Fruit(String name, String color) { this.name = name; this.color = color; } // NoArgsConstructor public Fruit() { this("default-fruit", "default-color"); } // ...create getters and setters for above fields // you figure it out } 

Eu ficaria com classs com tipos primitivos e String como campos antes que o pessoal do DataBricks reforçasse seus Encoders. Se você tiver uma class com object nested, crie outro Java Bean simples com todos os seus campos achatados, para poder usar transformações RDD para mapear o tipo complexo para o mais simples. Claro que é um pouco de trabalho extra, mas imagino que ajudará muito no desempenho trabalhando com um esquema simples.

Etapa 2: obtenha seu dataset do RDD

 SparkSession spark = SparkSession.builder().getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(); List fruitList = ImmutableList.of( new Fruit("apple", "red"), new Fruit("orange", "orange"), new Fruit("grape", "purple")); JavaRDD fruitJavaRDD = jsc.parallelize(fruitList); RDD fruitRDD = fruitJavaRDD.rdd(); Encoder fruitBean = Encoders.bean(Fruit.class); Dataset fruitDataset = spark.createDataset(rdd, bean); 

E voila! Espuma, enxaguar, repita.

Para aqueles que podem na minha situação, também coloco minha resposta aqui.

Para ser específico,

  1. Eu estava lendo ‘Set typed data’ do SQLContext. O formato de dados original é o DataFrame.

    val sample = spark.sqlContext.sql("select 1 as a, collect_set(1) as b limit 1") sample.show()

    +---+---+ | a| b| +---+---+ | 1|[1]| +---+---+

  2. Em seguida, converta-o em RDD usando rdd.map () com o tipo mutable.WrappedArray.

    sample .rdd.map(r => (r.getInt(0), r.getAs[mutable.WrappedArray[Int]](1).toSet)) .collect() .foreach(println)

    Resultado:

    (1,Set(1))