Gravar em várias saídas pela chave Spark – um trabalho do Spark

Como você pode gravar em várias saídas dependendo da chave usando o Spark em um único trabalho.

Relacionados: Gravar em várias saídas por chave Scalding Hadoop, uma tarefa MapReduce

Por exemplo

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .writeAsMultiple(prefix, compressionCodecOption) 

garantiria cat prefix/1 seja

 a b 

e cat prefix/2 seria

 c 

Responda

Para resposta exata com importações completas, codec pimp e de compactação, consulte https://stackoverflow.com/a/46118044/1586965

Se você usar o Spark 1.4+, isso se tornou muito mais fácil graças à API do DataFrame . (Os DataFrames foram introduzidos no Spark 1.3, mas o partitionBy() , que precisamos, foi introduzido em 1.4 .)

Se você está começando com um RDD, primeiro precisa convertê-lo em um DataFrame:

 val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie"))) val people_df = people_rdd.toDF("number", "name") 

Em Python, esse mesmo código é:

 people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")]) people_df = people_rdd.toDF(["number", "name"]) 

Depois de ter um DataFrame, é simples gravar em várias saídas com base em uma chave específica. Além do mais – e esta é a beleza da API DataFrame – o código é praticamente o mesmo em Python, Scala, Java e R:

 people_df.write.partitionBy("number").text("people") 

E você pode facilmente usar outros formatos de saída se quiser:

 people_df.write.partitionBy("number").json("people-json") people_df.write.partitionBy("number").parquet("people-parquet") 

Em cada um desses exemplos, o Spark criará um subdiretório para cada uma das chaves em que particionamos o DataFrame:

 people/ _SUCCESS number=1/ part-abcd part-efgh number=2/ part-abcd part-efgh 

Eu faria assim, escalável

 import org.apache.hadoop.io.NullWritable import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String] } object Split { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Split" + args(1)) val sc = new SparkContext(conf) sc.textFile("input/path") .map(a => (k, v)) // Your own implementation .partitionBy(new HashPartitioner(num)) .saveAsHadoopFile("output/path", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat]) spark.stop() } } 

Acabei de ver uma resposta semelhante acima, mas na verdade não precisamos de partições personalizadas. O MultipleTextOutputFormat criará o arquivo para cada chave. Não há problema em que vários registros com as mesmas chaves caiam na mesma partição.

new HashPartitioner (num), onde o num é o número da partição que você deseja. Caso você tenha um grande número de chaves diferentes, você pode definir o número como grande. Nesse caso, cada partição não abrirá muitos manipuladores de arquivos hdfs.

Se você tiver potencialmente muitos valores para uma determinada chave, acho que a solução escalável é gravar um arquivo por chave por partição. Infelizmente não há suporte embutido para isso no Spark, mas podemos agitar alguma coisa.

 sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) .mapPartitionsWithIndex { (p, it) => val outputs = new MultiWriter(p.toString) for ((k, v) < - it) { outputs.write(k.toString, v) } outputs.close Nil.iterator } .foreach((x: Nothing) => ()) // To trigger the job. // This one is Local, but you could write one for HDFS class MultiWriter(suffix: String) { private val writers = collection.mutable.Map[String, java.io.PrintWriter]() def write(key: String, value: Any) = { if (!writers.contains(key)) { val f = new java.io.File("output/" + key + "/" + suffix) f.getParentFile.mkdirs writers(key) = new java.io.PrintWriter(f) } writers(key).println(value) } def close = writers.values.foreach(_.close) } 

(Substitua o PrintWriter pela sua escolha de operação distribuída do sistema de arquivos.)

Isso faz uma única passagem sobre o RDD e não executa shuffle. Dá-lhe um diretório por chave, com um número de arquivos dentro de cada um.

Isso inclui o codec solicitado, as importações necessárias e o cafetão, conforme solicitado.

 import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext // TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) { def writeAsMultiple(prefix: String, codec: String, keyName: String = "key") (implicit sqlContext: SQLContext): Unit = { import sqlContext.implicits._ rdd.toDF(keyName, "_2").write.partitionBy(keyName) .format("text").option("codec", codec).save(prefix) } } val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"))) myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec") 

Uma diferença sutil para o OP é que ele prefixará = para os nomes de diretório. Por exemplo

 myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec") 

Daria:

 prefix/key=1/part-00000 prefix/key=2/part-00000 

em que prefix/my_number=1/part-00000 conteria as linhas b , e prefix/my_number=2/part-00000 conteria a linha c .

E

 myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo") 

Daria:

 prefix/foo=1/part-00000 prefix/foo=2/part-00000 

Deve ficar claro como editar para o parquet .

Finalmente, abaixo, há um exemplo para o Dataset , que talvez seja melhor do que usar o Tuples.

 implicit class PimpedDataset[T](dataset: Dataset[T]) { def writeAsMultiple(prefix: String, codec: String, field: String): Unit = { dataset.write.partitionBy(field) .format("text").option("codec", codec).save(prefix) } } 

Eu tenho uma necessidade semelhante e encontrei um caminho. Mas tem uma desvantagem (o que não é um problema para o meu caso): você precisa reparticionar seus dados com uma partição por arquivo de saída.

Para particionar dessa forma, geralmente é necessário saber de antemão quantos arquivos a tarefa produzirá e encontrar uma function que mapeie cada chave para cada partição.

Primeiro vamos criar nossa class baseada em MultipleTextOutputFormat:

 import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] { override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = { key.toString } override protected def generateActualKey(key: T, value: V) = { null } } 

Com essa class, o Spark obtém uma chave de uma partição (a primeira / última, eu acho) e nomeia o arquivo com essa chave, portanto, não é bom misturar várias chaves na mesma partição.

Para o seu exemplo, você precisará de um particionador personalizado. Isso fará o trabalho:

 import org.apache.spark.Partitioner class IdentityIntPartitioner(maxKey: Int) extends Partitioner { def numPartitions = maxKey def getPartition(key: Any): Int = key match { case i: Int if i < maxKey => i } } 

Agora vamos juntar tudo:

 val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e"))) // You need to know the max number of partitions (files) beforehand // In this case we want one partition per key and we have 3 keys, // with the biggest key being 7, so 10 will be large enough val partitioner = new IdentityIntPartitioner(10) val prefix = "hdfs://.../prefix" val partitionedRDD = rdd.partitionBy(partitioner) partitionedRDD.saveAsHadoopFile(prefix, classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]]) 

Isso gerará 3 arquivos sob o prefixo (nomeados 1, 2 e 7), processando tudo em uma única passagem.

Como você pode ver, você precisa de algum conhecimento sobre suas chaves para poder usar esta solução.

Para mim, foi mais fácil porque eu precisava de um arquivo de saída para cada hash de chave e o número de arquivos estava sob meu controle, então eu poderia usar o HashPartitioner para fazer o truque.

saveAsText () e saveAsHadoop (…) são implementados com base nos dados RDD, especificamente pelo método: PairRDD.saveAsHadoopDataset que pega os dados do PairRdd onde é executado. Eu vejo duas opções possíveis: Se seus dados são relativamente pequenos em tamanho, você poderia economizar algum tempo de implementação agrupando sobre o RDD, criando um novo RDD de cada coleção e usando esse RDD para gravar os dados. Algo assim:

 val byKey = dataRDD.groupByKey().collect() val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)} val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k} 

Observe que ele não funcionará para grandes conjuntos de dados b / c que a materialização do iterador em v.toSeq pode não caber na memory.

A outra opção que eu vejo, e na verdade a que eu recomendo neste caso é: role sua própria, chamando diretamente a API hadoop / hdfs.

Aqui está uma discussão que comecei enquanto pesquisava esta questão: Como criar RDDs de outro RDD?

Eu tinha um caso de uso semelhante em que dividi o arquivo de input no Hadoop HDFS em vários arquivos com base em uma chave (1 arquivo por chave). Aqui está o meu código scala para faísca

 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; val hadoopconf = new Configuration(); val fs = FileSystem.get(hadoopconf); @serializable object processGroup { def apply(groupName:String, records:Iterable[String]): Unit = { val outFileStream = fs.create(new Path("/output_dir/"+groupName)) for( line < - records ) { outFileStream.writeUTF(line+"\n") } outFileStream.close() } } val infile = sc.textFile("input_file") val dateGrouped = infile.groupBy( _.split(",")(0)) dateGrouped.foreach( (x) => processGroup(x._1, x._2)) 

Eu agrupei os registros com base na chave. Os valores de cada chave são gravados em arquivo separado.

boa notícia para o usuário python no caso de você ter várias colunas e você deseja salvar todas as outras colunas não particionadas no formato csv que falhará se você usar o método “text” como sugestão de Nick Chammas.

 people_df.write.partitionBy("number").text("people") 

mensagem de erro é “AnalysisException: u’Text fonte de dados suporta apenas uma única coluna e você tem 2 colunas .; ‘”

Em faísca 2.0.0 (meu ambiente de teste é faísca do hdp 2.0.0) pacote “com.databricks.spark.csv” está agora integrado, e nos permite salvar o arquivo de texto particionado por apenas uma coluna, veja o exemplo golpe:

 people_rdd = sc.parallelize([(1,"2016-12-26", "alice"), (1,"2016-12-25", "alice"), (1,"2016-12-25", "tom"), (1, "2016-12-25","bob"), (2,"2016-12-26" ,"charlie")]) df = people_rdd.toDF(["number", "date","name"]) df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people") [root@namenode people]# tree . ├── number=1 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv ├── number=2 │?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv └── _SUCCESS [root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,alice 2016-12-25,alice 2016-12-25,tom 2016-12-25,bob [root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv 2016-12-26,charlie 

Na minha faísca 1.6.1 ambiente, o código não jogou nenhum erro, no entanto, é apenas um arquivo gerado. não é particionado por duas pastas.

Espero que isso possa ajudar.

Eu tive um caso de uso semelhante. Eu resolvi isso em Java escrevendo duas classs customizadas RecordWriter MultipleTextOutputFormat e RecordWriter .

Minha input era um JavaPairRDD> e eu queria armazená-lo em um arquivo chamado por sua chave, com todas as linhas contidas em seu valor.

Aqui está o código para minha implementação MultipleTextOutputFormat

 class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat { @Override protected String generateFileNameForKeyValue(K key, V value, String name) { return key.toString(); //The return will be used as file name } /** The following 4 functions are only for visibility purposes (they are used in the class MyRecordWriter) **/ protected String generateLeafFileName(String name) { return super.generateLeafFileName(name); } protected V generateActualValue(K key, V value) { return super.generateActualValue(key, value); } protected String getInputFileBasedOutputFileName(JobConf job, String name) { return super.getInputFileBasedOutputFileName(job, name); } protected RecordWriter getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { return super.getBaseRecordWriter(fs, job, name, arg3); } /** Use my custom RecordWriter **/ @Override RecordWriter getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException { final String myName = this.generateLeafFileName(name); return new MyRecordWriter(this, fs, job, arg3, myName); } } 

Aqui está o código para minha implementação de RecordWriter .

 class MyRecordWriter implements RecordWriter { private RDDMultipleTextOutputFormat rddMultipleTextOutputFormat; private final FileSystem fs; private final JobConf job; private final Progressable arg3; private String myName; TreeMap> recordWriters = new TreeMap(); MyRecordWriter(RDDMultipleTextOutputFormat rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) { this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat; this.fs = fs; this.job = job; this.arg3 = arg3; this.myName = myName; } @Override void write(K key, V value) throws IOException { String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName); String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath); Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value); RecordWriter rw = this.recordWriters.get(finalPath); if(rw == null) { rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3); this.recordWriters.put(finalPath, rw); } List lines = (List) actualValue; for (String line : lines) { rw.write(null, line); } } @Override void close(Reporter reporter) throws IOException { Iterator keys = this.recordWriters.keySet().iterator(); while(keys.hasNext()) { RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next()); rw.close(reporter); } this.recordWriters.clear(); } } 

A maior parte do código é exatamente a mesma que no FileOutputFormat . A única diferença é essas poucas linhas

 List lines = (List) actualValue; for (String line : lines) { rw.write(null, line); } 

Essas linhas me permitiram escrever cada linha da minha input List no arquivo. O primeiro argumento da function write é definido como null para evitar a write da chave em cada linha.

Para terminar, só preciso fazer essa chamada para gravar meus arquivos

 javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);