Spark – Obtendo o nome do arquivo em RDDs

Eu estou tentando processar 4 diretórios de arquivos de texto que continuam crescendo a cada dia. O que eu preciso fazer é, se alguém está tentando procurar um número de fatura, eu deveria dar-lhes a lista de arquivos que o possui.

Eu era capaz de mapear e reduzir os valores em arquivos de texto, carregando-os como RDD.Mas como posso obter o nome do arquivo e outros atributos de arquivo?

Desde o Spark 1.6, você pode combinar a fonte de dados de text e a function input_file_name seguinte forma:

Scala :

 import org.apache.spark.sql.functions.input_file_name val inputPath: String = ??? spark.read.text(inputPath) .select(input_file_name, $"value") .as[(String, String)] // Optionally convert to Dataset .rdd // or RDD 

Python :

( Versões anteriores a 2.x estão com bugs e podem não preservar nomes quando convertidas para RDD ):

 from pyspark.sql.functions import input_file_name (spark.read.text(input_path) .select(input_file_name(), "value")) .rdd) 

Isso pode ser usado com outros formatos de input também.

Se os arquivos de texto forem pequenos o suficiente, você poderá usar o SparkContext.wholeTextFiles que retorna um RDD (filename,content) de (filename,content) .

Se os arquivos de texto forem muito grandes para SparkContext.wholeTextFiles , você usaria um InputFormat personalizado (simples) e, em seguida, chamaria SparkContext.hadoopRDD

O InputFormat precisaria retornar uma tupla (nome do arquivo, linha) ao invés de linha, então você poderia filtrar usando um predicado que examinasse o conteúdo da linha, depois fosse exclusivo e coletasse os nomes dos arquivos.

Do Spark, o código seria algo como:

 val ft = classOf[FileNamerInputFormat] val kt = classOf[String] val vt = classOf[String] val hadoopConfig = new Configuration(sc.hadoopConfiguration) sc.newAPIHadoopFile(path, ft, kt, vt, hadoopConfig) .filter { case (f, l) => isInteresting(l) } .map { case (f, _) => f } .distinct() .collect() 

Você pode tentar isso se você estiver no pyspark:

  test = sc.wholeTextFiles("pathtofile") 

você obterá um RDD resultante com primeiro elemento = filepath e segundo elemento = conteúdo

Você pode usar o WholeTextFile() para conseguir isso. No entanto, se os arquivos de input forem grandes, seria contraproducente usar WholeTextFile() pois ele coloca todo o conteúdo do arquivo em um único registro.

A melhor maneira de recuperar nomes de arquivos nesse cenário é usar mapPartitionsWithInputSplit() . Você pode encontrar um exemplo de trabalho usando este cenário no meu blog .

Parece um exagero usar o Spark diretamente … Se esses dados forem “coletados” para o driver, por que não usar a API do HDFS? Muitas vezes, o Hadoop é empacotado com o Spark. Aqui está um exemplo:

 import org.apache.hadoop.fs._ import org.apache.hadoop.conf._ val fileSpec = "/data/Invoices/20171123/21" val conf = new Configuration() val fs = org.apache.hadoop.fs.FileSystem.get(new URI("hdfs://nameNodeEneteredHere"),conf) val path = new Path(fileSpec) // if(fs.exists(path) && fs.isDirectory(path) == true) ... val fileList = fs.listStatus(path) 

Então, com println(fileList(0)) , info (formatado) como este primeiro item (como um exemplo) pode ser visto como org.apache.hadoop.fs.FileStatus :

 FileStatus { path=hdfs://nameNodeEneteredHere/Invoices-0001.avro; isDirectory=false; length=29665563; replication=3; blocksize=134217728; modification_time=1511810355666; access_time=1511838291440; owner=codeaperature; group=supergroup; permission=rw-r--r--; isSymlink=false } 

Onde fileList(0).getPath fornecerá hdfs://nameNodeEneteredHere/Invoices-0001.avro .

Eu acho que este meio de ler arquivos seria principalmente com o namenode HDFS e não dentro de cada executor. TLDR; Eu aposto que o Spark provavelmente iria pesquisar o namenode para obter RDDs. Se a chamada subjacente do Spark pesquisar o namenode para gerenciar os RDDs, talvez o acima seja uma solução eficiente. Ainda assim, comentários contributivos sugerindo que qualquer direção seria bem-vinda.