Stackoverflow devido à longa linhagem RDD

Eu tenho milhares de pequenos arquivos no HDFS. Precisa processar um subconjunto de arquivos um pouco menor (que é novamente em milhares), fileList contém uma lista de caminhos de arquivos que precisam ser processados.

// fileList == list of filepaths in HDFS var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD for (i  line.startsWith("#####")).map(line => (filePath, line)) masterRDD = masterRDD.union(sampleRDD) } masterRDD.first() 

// Uma vez fora do loop, a execução de qualquer ação resulta em erro de stackoverflow devido à longa linhagem de RDD

 Exception in thread "main" java.lang.StackOverflowError at scala.runtime.AbstractFunction1.(AbstractFunction1.scala:12) at org.apache.spark.rdd.UnionRDD$$anonfun$1.(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) ===================================================================== ===================================================================== at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 

Em geral, você pode usar pontos de verificação para quebrar linhagens longas. Alguns mais ou menos semelhantes a isso devem funcionar:

 import org.apache.spark.rdd.RDD import scala.reflect.ClassTag val checkpointInterval: Int = ??? def loadAndFilter(path: String) = sc.textFile(path) .filter(_.startsWith("#####")) .map((path, _)) def mergeWithLocalCheckpoint[T: ClassTag](interval: Int) (acc: RDD[T], xi: (RDD[T], Int)) = { if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint else xi._1.union(acc) } val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)] fileList.map(loadAndFilter).zipWithIndex .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval)) 

Nesta situação particular, uma solução muito mais simples deve ser usar o método SparkContext.union :

 val masterRDD = sc.union( fileList.map(path => sc.textFile(path) .filter(_.startsWith("#####")) .map((path, _))) ) 

Uma diferença entre esses methods deve ser óbvia quando você der uma olhada no DAG gerado por loop / reduce :

insira a descrição da imagem aqui

e uma union única:

insira a descrição da imagem aqui

É claro que, se os arquivos são pequenos, você pode combinar o wholeTextFiles com o flatMap e ler todos os arquivos de uma só vez:

 sc.wholeTextFiles(fileList.mkString(",")) .flatMap{case (path, text) => text.split("\n").filter(_.startsWith("#####")).map((path, _))}