Eu estou querendo saber se existe uma maneira de saber o número de linhas escritas por uma operação de salvamento do Spark. Eu sei que é o suficiente para fazer uma contagem no RDD antes de escrevê-lo, mas gostaria de saber se existe uma maneira de ter a mesma informação sem fazê-lo.
Obrigado Marco
Se você realmente quiser, pode adicionar ouvinte personalizado e extrair o número de linhas escritas de outputMetrics
. Um exemplo muito simples pode ser assim:
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} var recordsWrittenCount = 0L sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { synchronized { recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten } } }) sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar") recordsWrittenCount // Long = 10
mas esta parte da API destina-se ao uso interno.
A resposta aceita mais de perto corresponde às necessidades específicas dos OPs (como explicitado em vários comentários), no entanto, esta resposta irá atender a maioria.
A abordagem mais eficiente é usar um Acumulador: http://spark.apache.org/docs/latest/programming-guide.html#accumulators
val accum = sc.accumulator(0L) data.map { x => accum += 1 x } .saveAsTextFile(path) val count = accum.value
Você pode então envolver isso em um cafetão útil:
implicit class PimpedStringRDD(rdd: RDD[String]) { def saveAsTextFileAndCount(p: String): Long = { val accum = rdd.sparkContext.accumulator(0L) rdd.map { x => accum += 1 x } .saveAsTextFile(p) accum.value } }
Então você pode fazer
val count = data.saveAsTextFileAndCount(path)
Se você olhar
taskEnd.taskInfo.accumulables
Você verá que ele é fornecido com o AccumulableInfo
seguinte em ListBuffer
em uma ordem seqüencial.
AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None), AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None), AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None), AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None), AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None), AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql)
Você pode ver claramente que o número de linhas de saída está na 7ª posição do listBuffer, portanto, a maneira correta de obter as linhas sendo gravadas é
taskEnd.taskInfo.accumulables(6).value.get
Podemos obter as linhas escritas da seguinte maneira (acabei de modificar a resposta do @3232s)
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} var recordsWrittenCount = 0L sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { synchronized { recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long] } } }) sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar") recordsWrittenCount