Como faço para pular um header de arquivos CSV no Spark?

Suponha que eu dê três caminhos de arquivos para um contexto do Spark para ler e cada arquivo tenha um esquema na primeira linha. Como podemos ignorar linhas de esquema de headers?

val rdd=sc.textFile("file1,file2,file3") 

Agora, como podemos pular as linhas de header deste rdd?

   

Se houvesse apenas uma linha de header no primeiro registro, a maneira mais eficiente de filtrá-la seria:

 rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter } 

Isso não ajuda se, claro, houver muitos arquivos com muitas linhas de header dentro. Você pode unir três RDDs que você faz dessa maneira.

Você também pode simplesmente escrever um filter que corresponda apenas a uma linha que poderia ser um header. Isso é bem simples, mas menos eficiente.

Equivalente Python:

 from itertools import islice rdd.mapPartitionsWithIndex( lambda idx, it: islice(it, 1, None) if idx == 0 else it ) 
 data = sc.textFile('path_to_data') header = data.first() #extract header data = data.filter(row => row != header) #filter out header 

No Spark 2.0, um leitor CSV é criado no Spark, para que você possa carregar facilmente um arquivo CSV da seguinte maneira:

 spark.read.option("header","true").csv("filePath") 

A partir do Spark 2.0 , o que você pode fazer é usar o SparkSession para fazer isso como um forro:

 val spark = SparkSession.builder.config(conf).getOrCreate() 

e então como @SandeepPurohit disse:

 val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath) 

Espero que tenha resolvido sua pergunta!

PS: SparkSession é o novo ponto de input introduzido no Spark 2.0 e pode ser encontrado no pacote spark_sql

Você pode carregar cada arquivo separadamente, filtrá-los com file.zipWithIndex().filter(_._2 > 0) e depois unir todos os arquivos RDDs.

Se o número de arquivos é muito grande, a união poderia lançar um StackOverflowExeption .

No PySpark, você pode usar um dataframe e definir o header como True:

 df = spark.read.csv(dataPath, header=True) 

Use o método filter() no PySpark, filtrando o primeiro nome da coluna para remover o header:

 # Read file (change format for other file formats) contentRDD = sc.textfile() # Filter out first column of the header filterDD = contentRDD.filter(lambda l: not l.startswith() # Check your result for i in filterDD.take(5) : print (i) 

Como alternativa, você pode usar o pacote spark-csv (ou no Spark 2.0, isso está mais ou menos disponível nativamente como CSV). Note que isto espera o header em cada arquivo (como você deseja):

 schema = StructType([ StructField('lat',DoubleType(),True), StructField('lng',DoubleType(),True)]) df = sqlContext.read.format('com.databricks.spark.csv'). \ options(header='true', delimiter="\t", treatEmptyValuesAsNulls=True, mode="DROPMALFORMED").load(input_file,schema=schema) 

É uma opção que você passa para o comando read() :

 context = new org.apache.spark.sql.SQLContext(sc) var data = context.read.option("header","true").csv("") 

Trabalhando em 2018 (Spark 2.3)

Python

 df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv") 

Scala

val myDf = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

PD1: myManualSchema é um esquema predefinido escrito por mim, você pode pular essa parte do código

Isso deve funcionar bem

 def dropHeader(data: RDD[String]): RDD[String] = { data.filter(r => r!=data.first) } 
 //Find header from the files lying in the directory val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{ case (fileName, stream)=> val header = new BufferedReader(new InputStreamReader(stream.open())).readLine() (fileName, header) }.collect().toMap val fileNameHeaderBr = sc.broadcast(fileNameHeader) // Now let's skip the header. mapPartition will ensure the header // can only be the first line of the partition sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter => if(iter.hasNext){ val firstLine = iter.next() println(s"Comparing with firstLine $firstLine") if(firstLine == fileNameHeaderBr.value.head._2) new WrappedIterator(null, iter) else new WrappedIterator(firstLine, iter) } else { iter } ).collect().foreach(println) class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{ var isFirstIteration = true override def hasNext: Boolean = { if (isFirstIteration && firstLine != null){ true } else{ iter.hasNext } } override def next(): String = { if (isFirstIteration){ println(s"For the first time $firstLine") isFirstIteration = false if (firstLine != null){ firstLine } else{ println(s"Every time $firstLine") iter.next() } } else { iter.next() } } } 

Para desenvolvedores de python. Eu testei com spark2.0. Vamos supor que você queira remover as primeiras 14 linhas.

 sc = spark.sparkContext lines = sc.textFile("s3://folder_location_of_csv/") parts = lines.map(lambda l: l.split(",")) parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0]) 

withColumn é a function df. Então, abaixo não funcionará no estilo RDD como usado acima.

 parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)