Articles of apache spark sql

Achatando Linhas no Spark

Eu estou fazendo alguns testes de faísca usando scala. Geralmente lemos arquivos json que precisam ser manipulados como no exemplo a seguir: test.json: {“a”:1,”b”:[2,3]} val test = sqlContext.read.json(“test.json”) Como posso convertê-lo para o seguinte formato: {“a”:1,”b”:2} {“a”:1,”b”:3}

Como alterar os tipos de coluna no DataFrame do Spark SQL?

Suponha que eu esteja fazendo algo como: val df = sqlContext.load(“com.databricks.spark.csv”, Map(“path” -> “cars.csv”, “header” -> “true”)) df.printSchema() root |– year: string (nullable = true) |– make: string (nullable = true) |– model: string (nullable = true) |– comment: string (nullable = true) |– blank: string (nullable = true) df.show() year make model comment blank […]

Como evitar colunas duplicadas após a junit?

Eu tenho dois frameworks de dados com as seguintes colunas: df1.columns // Array(ts, id, X1, X2) e df2.columns // Array(ts, id, Y1, Y2) Depois eu faço val df_combined = df1.join(df2, Seq(ts,id)) Acabo com as seguintes colunas: Array(ts, id, X1, X2, ts, id, Y1, Y2) . Eu poderia esperar que as colunas comuns fossem descartadas. Existe […]

Como definir o esquema para o tipo personalizado no Spark SQL?

O código de exemplo a seguir tenta colocar alguns objects de caso em um dataframe. O código inclui a definição de uma hierarquia de objects de caso e uma class de caso usando esse traço: import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.SQLContext sealed trait Some case object AType extends Some case object BType extends Some case class […]

Processando vários arquivos como RDDs independentes em paralelo

Eu tenho um cenário em que um certo número de operações, incluindo um grupo por, tem que ser aplicado em vários arquivos pequenos (~ 300MB cada). A operação parece com isso .. df.groupBy(….).agg(….) Agora, para processá-lo em vários arquivos, posso usar um curinga “/**/*.csv”, no entanto, isso cria um único RDD e particiona-o para as […]

Como executar transformações independentes em paralelo usando o PySpark?

Eu estou tentando executar 2 funções fazendo transformações completamente independentes em um único RDD em paralelo usando o PySpark. Quais são alguns methods para fazer o mesmo? def doXTransforms(sampleRDD): (X transforms) def doYTransforms(sampleRDD): (Y Transforms) if __name__ == “__main__”: sc = SparkContext(appName=”parallelTransforms”) sqlContext = SQLContext(sc) hive_context = HiveContext(sc) rows_rdd = hive_context.sql(“select * from tables.X_table”) p1 […]

Spark – carrega o arquivo CSV como DataFrame?

Eu gostaria de ler um CSV em faísca e convertê-lo como DataFrame e armazená-lo em HDFS com df.registerTempTable(“table_name”) Eu tentei: scala> val df = sqlContext.load(“hdfs:///csv/file/dir/file.csv”) Erro que recebi: java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10] at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277) at […]

O SparkSQL oferece suporte à subconsulta?

Eu estou executando esta consulta no shell Spark, mas me dá erro, sqlContext.sql( “select sal from samplecsv where sal < (select MAX(sal) from samplecsv)" ).collect().foreach(println) erro: java.lang.RuntimeException: [1.47] falha: “) ” esperado mas identificador MAX encontrado select sal from samplecsv onde sal <(selecione MAX (sal) de samplecsv) ^ em scala.sys.package $ .error (package.scala: 27) Alguém […]

Como definir o particionamento do DataFrame?

Eu comecei a usar o Spark SQL e DataFrames no Spark 1.4.0. Eu estou querendo definir um particionador personalizado em DataFrames, no Scala, mas não vendo como fazer isso. Uma das tabelas de dados com as quais estou trabalhando contém uma lista de transactions, por conta, silimar para o exemplo a seguir. Account Date Type […]

Conjunto de dados do Spark 2.0 vs DataFrame

começando com faísca 2.0.1 Eu tenho algumas perguntas. Eu li muita documentação, mas até agora não consegui encontrar respostas suficientes: Qual é a diferença entre df.select(“foo”) df.select($”foo”) eu entendi corretamente que myDataSet.map(foo.someVal) é typesafe e não será convertido em RDD mas permanece na representação DataSet / sem sobrecarga adicional (desempenho sábio para 2.0.0) todos os […]