Spark: como obter o número de linhas escritas?

Eu estou querendo saber se existe uma maneira de saber o número de linhas escritas por uma operação de salvamento do Spark. Eu sei que é o suficiente para fazer uma contagem no RDD antes de escrevê-lo, mas gostaria de saber se existe uma maneira de ter a mesma informação sem fazê-lo.

Obrigado Marco

Se você realmente quiser, pode adicionar ouvinte personalizado e extrair o número de linhas escritas de outputMetrics . Um exemplo muito simples pode ser assim:

 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} var recordsWrittenCount = 0L sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { synchronized { recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten } } }) sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar") recordsWrittenCount // Long = 10 

mas esta parte da API destina-se ao uso interno.

A resposta aceita mais de perto corresponde às necessidades específicas dos OPs (como explicitado em vários comentários), no entanto, esta resposta irá atender a maioria.

A abordagem mais eficiente é usar um Acumulador: http://spark.apache.org/docs/latest/programming-guide.html#accumulators

 val accum = sc.accumulator(0L) data.map { x => accum += 1 x } .saveAsTextFile(path) val count = accum.value 

Você pode então envolver isso em um cafetão útil:

 implicit class PimpedStringRDD(rdd: RDD[String]) { def saveAsTextFileAndCount(p: String): Long = { val accum = rdd.sparkContext.accumulator(0L) rdd.map { x => accum += 1 x } .saveAsTextFile(p) accum.value } } 

Então você pode fazer

 val count = data.saveAsTextFileAndCount(path) 

Se você olhar

 taskEnd.taskInfo.accumulables 

Você verá que ele é fornecido com o AccumulableInfo seguinte em ListBuffer em uma ordem seqüencial.

 AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None), AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None), AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None), AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None), AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None), AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql) 

Você pode ver claramente que o número de linhas de saída está na 7ª posição do listBuffer, portanto, a maneira correta de obter as linhas sendo gravadas é

 taskEnd.taskInfo.accumulables(6).value.get 

Podemos obter as linhas escritas da seguinte maneira (acabei de modificar a resposta do @3232s)

 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} var recordsWrittenCount = 0L sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { synchronized { recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long] } } }) sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar") recordsWrittenCount