Spark – carrega o arquivo CSV como DataFrame?

Eu gostaria de ler um CSV em faísca e convertê-lo como DataFrame e armazená-lo em HDFS com df.registerTempTable("table_name")

Eu tentei:

 scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv") 

Erro que recebi:

 java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10] at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Qual é o comando certo para carregar o arquivo CSV como DataFrame no Apache Spark?

O spark-csv faz parte da funcionalidade principal do Spark e não requer uma biblioteca separada. Então você poderia apenas fazer por exemplo

 df = spark.read.format("csv").option("header", "true").load("csvfile.csv") 

analisar CSV como DataFrame / DataSet com Spark 2.x

Primeiro inicializar o object SparkSession por padrão, ele estará disponível em shells como spark

 val spark = org.apache.spark.sql.SparkSession.builder .master("local") .appName("Spark CSV Reader") .getOrCreate; 

Use qualquer uma das seguintes maneiras de carregar CSV como DataFrame/DataSet

1. Faça isso de maneira programática

  val df = spark.read .format("csv") .option("header", "true") //reading the headers .option("mode", "DROPMALFORMED") .load("hdfs:///csv/file/dir/file.csv") 

2. Você também pode fazer isso da maneira SQL

  val df = spark.sql("SELECT * FROM csv.`csv/file/path/in/hdfs`") 

Dependências :

  "org.apache.spark" % "spark-core_2.11" % 2.0.0, "org.apache.spark" % "spark-sql_2.11" % 2.0.0, 


Versão Spark <2,0

 val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("mode", "DROPMALFORMED") .load("csv/file/path"); 

Dependências:

 "org.apache.spark" % "spark-sql_2.10" % 1.6.0, "com.databricks" % "spark-csv_2.10" % 1.6.0, "com.univocity" % "univocity-parsers" % LATEST, 

É para quem o Hadoop é o 2.6 e o ​​Spark é 1.6 e sem o pacote “databricks”.

 import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}; import org.apache.spark.sql.Row; val csv = sc.textFile("/path/to/file.csv") val rows = csv.map(line => line.split(",").map(_.trim)) val header = rows.first val data = rows.filter(_(0) != header(0)) val rdd = data.map(row => Row(row(0),row(1).toInt)) val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val", IntegerType, true)) val df = sqlContext.createDataFrame(rdd, schema) 

Com o Spark 2.0, segue como você pode ler CSV

 val conf = new SparkConf().setMaster("local[2]").setAppName("my app") val sc = new SparkContext(conf) val sparkSession = SparkSession.builder .config(conf = conf) .appName("spark session example") .getOrCreate() val path = "/Users/xxx/Downloads/usermsg.csv" val base_df = sparkSession.read.option("header","true"). csv(path) 

No Java 1.8 Este trecho de código funciona perfeitamente para ler arquivos CSV

POM.xml

  org.apache.spark spark-core_2.11 2.0.0    org.apache.spark spark-sql_2.10 2.0.0    org.scala-lang scala-library 2.11.8   com.databricks spark-csv_2.10 1.4.0  

Java

 SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); // create Spark Context SparkContext context = new SparkContext(conf); // create spark Session SparkSession sparkSession = new SparkSession(context); Dataset df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); System.out.println("========== Print Schema ============"); df.printSchema(); System.out.println("========== Print Data =============="); df.show(); System.out.println("========== Print title =============="); df.select("title").show(); 

Penny’s Spark 2 exemplo é o caminho para fazê-lo em spark2. Há mais um truque: ter esse header gerado para você fazendo uma varredura inicial dos dados, configurando a opção inferSchema como true

Aqui, então, assumindo que spark é uma session de ignição que você configurou, é a operação para carregar no arquivo de índice CSV de todas as imagens Landsat que o host amazon no S3.

  /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ val csvdata = spark.read.options(Map( "header" -> "true", "ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true", "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ", "inferSchema" -> "true", "mode" -> "FAILFAST")) .csv("s3a://landsat-pds/scene_list.gz") 

A má notícia é: isso triggers uma varredura através do arquivo; para algo grande como este arquivo CSV de 20 + MB compactado, que pode levar 30 segundos em uma conexão de longa distância. Tenha isso em mente: é melhor você codificar manualmente o esquema assim que ele estiver chegando.

(código de fragment Apache Software License 2.0 licenciado para evitar toda a ambigüidade; algo que fiz como um teste de demonstração / integração de integração S3)

Formato de arquivo padrão é Parquet com spark.read .. e arquivo de leitura csv que porque você está recebendo a exceção. Especifique o formato csv com a API que você está tentando usar

Há muitos desafios para analisar um arquivo CSV, ele continua sumndo se o tamanho do arquivo for maior, se houver caracteres não-ingleses / escape / separador / outros nos valores da coluna, que podem causar erros de análise.

A mágica então está nas opções que são usadas. Os que funcionaram para mim e espero que cubram a maioria dos casos de borda estão no código abaixo:

 ### Create a Spark Session spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate() ### Note the options that are used. You may have to tweak these in case of error html_df = spark.read.csv(html_csv_file_path, header=True, multiLine=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, encoding="UTF-8", sep=',', quote='"', escape='"', maxColumns=2, inferSchema=True) 

Espero que ajude. Para mais detalhes: Usando o PySpark 2 para ler CSV com código-fonte HTML

Observação: o código acima é da API do Spark 2, em que a API de leitura de arquivos CSV vem acompanhada de pacotes internos do Spark instaláveis.

Nota: O PySpark é um wrapper do Python para o Spark e compartilha a mesma API do Scala / Java.