Erro de codificador ao tentar mapear a linha do dataframe para a linha atualizada

Quando estou tentando fazer a mesma coisa no meu código como mencionado abaixo

dataframe.map(row => { val row1 = row.getAs[String](1) val make = if (row1.toLowerCase == "tesla") "S" else row1 Row(row(0),make,row(2)) }) 

Tomei a referência acima a partir daqui: Scala: Como posso replace o valor em Dataframs usando scala Mas estou recebendo erro de codificador como

Não é possível encontrar o codificador para o tipo armazenado em um dataset. Tipos primitivos (Int, S tring, etc) e Productos (classs de casos) são suportados importando spark.im plicits._ O suporte para serializar outros tipos será adicionado em versões futuras.

Nota: estou usando faísca 2.0!

Não há nada de inesperado aqui. Você está tentando usar o código que foi gravado com o Spark 1.x e não é mais suportado no Spark 2.0:

  • em 1.x DataFrame.map é ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • em 2.x O Dataset[Row].map é ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

Para ser honesto, não fazia muito sentido em 1.x também. Independente da versão, você pode simplesmente usar a API do DataFrame :

 import org.apache.spark.sql.functions.{when, lower} val df = Seq( (2012, "Tesla", "S"), (1997, "Ford", "E350"), (2015, "Chevy", "Volt") ).toDF("year", "make", "model") df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make")) 

Se você realmente quiser usar o map , use o Dataset tipado estaticamente:

 import spark.implicits._ case class Record(year: Int, make: String, model: String) df.as[Record].map { case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S") case rec => rec } 

ou pelo menos retornar um object que terá codificador implícito:

 df.map { case Row(year: Int, make: String, model: String) => (year, if(make.toLowerCase == "tesla") "S" else make, model) } 

Finalmente, se por algum motivo completamente louco você realmente quiser mapear o Dataset[Row] você deve fornecer o codificador necessário:

 import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.Row // Yup, it would be possible to reuse df.schema here val schema = StructType(Seq( StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType) )) val encoder = RowEncoder(schema) df.map { case Row(year, make: String, model) if make.toLowerCase == "tesla" => Row(year, "S", model) case row => row } (encoder) 

Para o cenário em que o esquema do dataframe é conhecido com antecedência, a resposta dada por @ zero323 é a solução

mas para cenário com esquema dynamic / ou passando vários dataframe para uma function genérica: O código a seguir funcionou para nós durante a migration do 1.6.1 do 2.2.0

 import org.apache.spark.sql.Row val df = Seq( (2012, "Tesla", "S"), (1997, "Ford", "E350"), (2015, "Chevy", "Volt") ).toDF("year", "make", "model") val data = df.rdd.map(row => { val row1 = row.getAs[String](1) val make = if (row1.toLowerCase == "tesla") "S" else row1 Row(row(0),make,row(2)) }) 

este código é executado em ambas as versões da faísca.

desvantagem: a otimização fornecida pela ignição no dataframe / datasets não será aplicada.