Como ler vários arquivos de texto em um único RDD?

Eu quero ler um monte de arquivos de texto de um local hdfs e realizar o mapeamento em uma iteração usando faísca.

JavaRDD records = ctx.textFile(args[1], 1); é capaz de ler apenas um arquivo por vez.

Eu quero ler mais de um arquivo e processá-los como um único RDD. Como?

Você pode especificar diretórios inteiros, usar curingas e até mesmo CSV de diretórios e curingas. Por exemplo:

 sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file") 

Como aponta Nick Chammas, isso é uma exposição do FileInputFormat do Hadoop e, portanto, isso também funciona com o Hadoop (e Scalding).

Use union seguinte maneira:

 val sc = new SparkContext(...) val r1 = sc.textFile("xxx1") val r2 = sc.textFile("xxx2") ... val rdds = Seq(r1, r2, ...) val bigRdd = sc.union(rdds) 

Então o bigRdd é o RDD com todos os arquivos.

Você pode usar uma única chamada textFile para ler vários arquivos. Scala:

 sc.textFile(','.join(files)) 

Você pode usar isso

Primeiro você pode obter um buffer / lista de caminhos S3:

 import scala.collection.JavaConverters._ import java.util.ArrayList import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.ObjectListing import com.amazonaws.services.s3.model.S3ObjectSummary import com.amazonaws.services.s3.model.ListObjectsRequest def listFiles(s3_bucket:String, base_prefix : String) = { var files = new ArrayList[String] //S3 Client and List Object Request var s3Client = new AmazonS3Client(); var objectListing: ObjectListing = null; var listObjectsRequest = new ListObjectsRequest(); //Your S3 Bucket listObjectsRequest.setBucketName(s3_bucket) //Your Folder path or Prefix listObjectsRequest.setPrefix(base_prefix) //Adding s3:// to the paths and adding to a list do { objectListing = s3Client.listObjects(listObjectsRequest); for (objectSummary <- objectListing.getObjectSummaries().asScala) { files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); //Removing Base Directory Name files.remove(0) //Creating a Scala List for same files.asScala } 

Agora passe este object de lista para o seguinte trecho de código, note: sc é um object de SQLContext

 var df: DataFrame = null; for (file <- files) { val fileDf= sc.textFile(file) if (df!= null) { df= df.unionAll(fileDf) } else { df= fileDf } } 

Agora você tem um RDD Unificado final, ou seja, df

Opcional, e você também pode reparticioná-lo em um único BigRDD

 val files = sc.textFile(filename, 1).repartition(1) 

Reparticionamento sempre funciona: D

No PySpark, eu encontrei uma maneira adicional útil para analisar arquivos. Talvez exista um equivalente em Scala, mas não estou confortável o bastante com uma tradução funcional. É, na verdade, uma chamada textFile com a adição de labels (no exemplo abaixo, a chave = nome do arquivo, valor = 1 linha do arquivo).

TextFile “marcado”

input:

 import glob from pyspark import SparkContext SparkContext.stop(sc) sc = SparkContext("local","example") # if running locally sqlContext = SQLContext(sc) for filename in glob.glob(Data_File + "/*"): Spark_Full += sc.textFile(filename).keyBy(lambda x: filename) 

output: matriz com cada input contendo uma tupla usando nome de arquivo como chave e com valor = cada linha de arquivo. (Tecnicamente, usando esse método, você também pode usar uma chave diferente além do nome do caminho de arquivo real – talvez uma representação de hashing para economizar memory). ie.

 [('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'), ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'), ...] 

Você também pode recombinar como uma lista de linhas:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

 [('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']), ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])] 

Ou recombine arquivos inteiros de volta para cadeias únicas (neste exemplo, o resultado é o mesmo que você obtém de wholeTextFiles, mas com a string “file:” removida do caminho do arquivo):

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()

você pode usar – registros JavaRDD = sc.wholeTextFiles (“caminho do seu diretório”) aqui você irá obter o caminho do seu arquivo e conteúdo desse arquivo. para que você possa executar qualquer ação de um arquivo inteiro por vez, economizando a sobrecarga

Todas as respostas estão corretas com sc.textFile

Eu só estava me perguntando por que não wholeTextFiles neste caso …

 sc.wholeTextFiles(yourfileListFromFolder.mkString(",")) .flatMap{case (path, text) ... 

Uma limitação é que, temos que carregar arquivos pequenos, caso contrário o desempenho será ruim e pode levar à OOM.

Nota :

  • O wholefile deve caber na memory
  • Bom para formatos de arquivo que NÃO são divisíveis por linha … como arquivos XML

Mais referência para visitar

Há uma solução limpa direta disponível. Use o método wholeTextFiles (). Isso levará um diretório e formará um par de valores-chave. O RDD retornado será um par RDD. Encontre abaixo a descrição dos documentos do Spark :

SparkContext.wholeTextFiles permite ler um diretório contendo vários arquivos de texto pequenos e retorna cada um deles como pares (nome de arquivo, conteúdo). Isso está em contraste com textFile, que retornaria um registro por linha em cada arquivo

EXPERIMENTE A interface usada para gravar um DataFrame em sistemas de armazenamento externos (por exemplo, filesystems, armazenamentos de valores-chave, etc.). Use DataFrame.write () para acessar isso.

Novo na versão 1.4.

csv (caminho, modo = Nenhum, compactação = Nenhum, sep = Nenhum, citação = Nenhum, escape = Nenhum, header = Nenhum, nullValue = Nenhum, escapeQuotes = Nenhum, quoteAll = Nenhum, dateFormat = Nenhum, timestampFormat = None) Salva o conteúdo do DataFrame no formato CSV no caminho especificado.

parameters: path – o caminho em qualquer modo de sistema de arquivos suportado pelo Hadoop – especifica o comportamento da operação de salvamento quando os dados já existirem.

acrescentar: Anexe o conteúdo deste DataFrame aos dados existentes. Sobrescrever: sobrescreve os dados existentes. ignore: silenciosamente ignore esta operação se os dados já existirem. erro (caso padrão): lança uma exceção se os dados já existirem. compression – codec de compression para usar ao salvar em arquivo. Esse pode ser um dos nomes abreviados de maiúsculas e minúsculas conhecidos (nenhum, bzip2, gzip, lz4, snappy e deflate). sep – define o caractere único como um separador para cada campo e valor. Se Nenhum estiver definido, ele usa o valor padrão,. quote – define o caractere único usado para escaping valores entre aspas, em que o separador pode fazer parte do valor. Se None estiver definido, ele usa o valor padrão, “. Se você deseja desativar as cotações, é necessário definir uma string vazia. Escape – configura o caractere único usado para aspas dentro de um valor já citado. Se Nenhum estiver definido , usa o valor padrão, \ escapeQuotes – Um sinalizador indicando se os valores contendo aspas devem sempre ser colocados entre aspas. Se Nenhum estiver definido, ele usa o valor padrão true, escapando de todos os valores que contêm um caractere de aspas. todos os valores devem sempre ser colocados entre aspas.Se Nenhum estiver definido, ele usa o valor padrão false, apenas os valores de escape que contêm um caractere de aspas header – grava os nomes das colunas como a primeira linha.Se None está definido, ele usa o padrão Valor null.Valor – define a representação da string de um valor nulo.Se Nenhum estiver definido, ele usa o valor padrão, string vazia.deFormatar – define a string que indica um formato de data.Os formatos de data personalizados seguem os formatos em java.text .SimpleDate Formato: aplica-se ao tipo de data. Se None estiver definido, ele usa o valor do valor padrão, aaaa-MM-dd. timestampFormat – define a cadeia que indica um formato de registro de data e hora. Os formatos de data personalizados seguem os formatos em java.text.SimpleDateFormat. Isso se aplica ao tipo de registro de data e hora. Se None estiver definido, ele usa o valor do valor padrão, aaaa-MM-dd’T’HH: mm: ss.SSSZZ.

 rdd = textFile('/data/{1.txt,2.txt}')