Definindo uma UDF que aceita uma matriz de objects em um Spark DataFrame?

Ao trabalhar com os DataFrames do Spark, as Funções Definidas pelo Usuário (UDFs) são necessárias para mapear dados em colunas. As UDFs exigem que os tipos de argumentos sejam explicitamente especificados. No meu caso, eu preciso manipular uma coluna que é composta de matrizes de objects e não sei qual tipo usar. Aqui está um exemplo:

import sqlContext.implicits._ // Start with some data. Each row (here, there's only one row) // is a topic and a bunch of subjects val data = sqlContext.read.json(sc.parallelize(Seq( """ |{ | "topic" : "pets", | "subjects" : [ | {"type" : "cat", "score" : 10}, | {"type" : "dog", "score" : 1} | ] |} """))) 

É relativamente simples usar o org.apache.spark.sql.functions para executar operações básicas nos dados nas colunas

 import org.apache.spark.sql.functions.size data.select($"topic", size($"subjects")).show +-----+--------------+ |topic|size(subjects)| +-----+--------------+ | pets| 2| +-----+--------------+ 

e é geralmente fácil escrever UDFs personalizadas para executar operações arbitrárias

 import org.apache.spark.sql.functions.udf val enhance = udf { topic : String => topic.toUpperCase() } data.select(enhance($"topic"), size($"subjects")).show +----------+--------------+ |UDF(topic)|size(subjects)| +----------+--------------+ | PETS| 2| +----------+--------------+ 

Mas e se eu quiser usar uma UDF para manipular a matriz de objects na coluna “assuntos”? Que tipo eu uso para o argumento no UDF? Por exemplo, se eu quiser reimplementar a function size, em vez de usar a fornecida por spark:

 val my_size = udf { subjects: Array[Something] => subjects.size } data.select($"topic", my_size($"subjects")).show 

Claramente Array[Something] não funciona … que tipo devo usar!? Devo abandonar Array[] completamente? scala.collection.mutable.WrappedArray me diz scala.collection.mutable.WrappedArray pode ter algo a ver com isso, mas ainda há outro tipo que eu preciso fornecer.

O que você está procurando é Seq[oassql.Row] :

 import org.apache.spark.sql.Row val my_size = udf { subjects: Seq[Row] => subjects.size } 

Explicação :

  • A representação atual do ArrayType é, como você já sabe, o WrappedArray , portanto o Array não funcionará e é melhor ficar do lado seguro.
  • Tipo local para StructType é Row . Infelizmente, isso significa que o access aos campos individuais não é seguro para o tipo.

Notas :

  • Para criar a function struct passada para o udf tem que retornar o tipo de Product ( Tuple* ou case class ), não a Row .