Linha Spark para JSON

Gostaria de criar um JSON a partir de um dataframe do Spark v.1.6 (usando scala). Eu sei que existe a solução simples de fazer o df.toJSON .

No entanto, meu problema parece um pouco diferente. Considere, por exemplo, um dataframe com as seguintes colunas:

 | A | B | C1 | C2 | C3 | ------------------------------------------- | 1 | test | ab | 22 | TRUE | | 2 | mytest | gh | 17 | FALSE | 

Eu gostaria de ter no final um dataframe com

 | A | B | C | ---------------------------------------------------------------- | 1 | test | { "c1" : "ab", "c2" : 22, "c3" : TRUE } | | 2 | mytest | { "c1" : "gh", "c2" : 17, "c3" : FALSE } | 

onde C é um JSON contendo C1 , C2 , C3 . Infelizmente, em tempo de compilation eu não sei como é o dataframe (exceto as colunas A e B que são sempre “fixas”).

Quanto ao motivo pelo qual eu preciso disso: estou usando o Protobuf para enviar os resultados. Infelizmente, meu dataframe às vezes tem mais colunas do que o esperado e eu ainda as enviaria via Protobuf, mas não desejo especificar todas as colunas na definição.

Como posso conseguir isso?

O Spark 2.1 deve ter suporte nativo para esse caso de uso (consulte o # 15354 ).

 import org.apache.spark.sql.functions.to_json df.select(to_json(struct($"c1", $"c2", $"c3"))) 

Primeiro, vamos converter C’s em uma struct :

 val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C")) 

Esta é a estrutura pode ser convertida para JSONL usando toJSON como antes:

 dfStruct.toJSON.collect // Array[String] = Array( // {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}}, // {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}}) 

Não tenho conhecimento de nenhum método interno que possa converter uma única coluna, mas você pode convertê-lo individualmente e join ou usar seu analisador JSON favorito em uma UDF.

 case class C(C1: String, C2: Int, C3: Boolean) object CJsonizer { import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.Serialization import org.json4s.jackson.Serialization.write implicit val formats = Serialization.formats(org.json4s.NoTypeHints) def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3)) } val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => CJsonizer.toJSON(c1, c2, c3)) df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3")) 

Aqui, nenhum analisador JSON e ele se adapta ao seu esquema:

 import org.apache.spark.sql.functions.{col, concat, concat_ws, lit} df.select( col(df.columns(0)), col(df.columns(1)), concat( lit("{"), concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => { val c = dt._1; val t = dt._2; concat( lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "") ), col(c), lit(if(t=="StringType") "\""; else "") ) }):_*), lit("}") ) as "C" ).collect()