13

I'm wondering if there is a way to know the number of lines written by a Spark save operation. I know that it's enough to do a count on the RDD before writing it, but I'd like to know if there is a way to have the same info without doing it.

Thank you, Marco

mgaido
  • 2,987
  • 3
  • 17
  • 39
  • Its may be duplicate of http://stackoverflow.com/questions/28413423/count-number-of-rows-in-an-rdd – Amit Kumar May 28 '16 at 19:19
  • 2
    @amit_kumar I don't think it's a duplicate, I think he wants to count it and save it without having to pass over the data twice. – samthebest May 29 '16 at 11:38

3 Answers3

17

If you really want you can add custom listener and extract number of written rows from outputMetrics. Very simple example can look like this:

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

var recordsWrittenCount = 0L

sc.addSparkListener(new SparkListener() { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten 
    }
  }
})

sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
// Long = 10

but this part of the API is intended for internal usage.

zero323
  • 322,348
  • 103
  • 959
  • 935
8

The accepted answer more closely matches the OPs specific needs (as made explicit in various comments), nevertheless this answer will suit the majority.

The most efficient approach is to use an Accumulator: http://spark.apache.org/docs/latest/programming-guide.html#accumulators

val accum = sc.accumulator(0L)

data.map { x =>
  accum += 1
  x
}
.saveAsTextFile(path)

val count = accum.value

You can then wrap this in a useful pimp:

implicit class PimpedStringRDD(rdd: RDD[String]) {
  def saveAsTextFileAndCount(p: String): Long = {
    val accum = rdd.sparkContext.accumulator(0L)

    rdd.map { x =>
      accum += 1
      x
    }
    .saveAsTextFile(p)

    accum.value
  }
}

So you can do

val count = data.saveAsTextFileAndCount(path)
samthebest
  • 30,803
  • 25
  • 102
  • 142
  • 4
    I know this kind of approach, but I'd like to avoid it for 2 main reasons: using it in a transformation means that the result cannot be trusted in case of some failures; there is anyway a (little) overhead. I was just wondering if there is a counter accessible someway, like there is in mapreduce, since in the web UI the number of rows written is shown... – mgaido May 29 '16 at 11:44
  • Well, thank you for your answer... even though I keep on wondering how they can show this info on the web UI if there is no internal counter... – mgaido May 29 '16 at 11:59
  • @mark91 Ah, well, you could clone the UI code and dig through it I guess. Having read the documentation, the code I've given is fine. (Spark says it protects against restarted tasks). It seems what you want to protect against is when an RDD is transformed multiple times, but the code I've given the rdd isn't accessible outside the Pimps scope. It will only accumulates before writing, and only accumulate once. – samthebest May 29 '16 at 12:01
  • count = rdd.count(); rdd.saveAsTextFile(p); Is this anyway better? – Amit Kumar May 29 '16 at 12:18
  • 1
    @amit_kumar If RDD is not cached this should be more efficient than separate count because data will be materialized only once. – zero323 May 29 '16 at 12:30
  • As far as I understand modifying accumulator in transformation action (i.e. map in your case) might result in invalid value. – Konstantin Kulagin May 13 '17 at 14:32
2

If you look at

taskEnd.taskInfo.accumulables

You will see that it is bundled with following AccumulableInfo in ListBuffer in a sequential order.

AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None), 
AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None), 
AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None), 
AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None), 
AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None), 
AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql)

You can clearly see that number of output rows are on the 7th position of the listBuffer, so the correct way to get the rows being written count is

taskEnd.taskInfo.accumulables(6).value.get

We can get the rows written by following way ( I just modified @zero323's answer)

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

var recordsWrittenCount = 0L

sc.addSparkListener(new SparkListener() { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long] 
    }
  }
})

sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • It is not that simple unfortunately. I had several tasks ie: stage1"s task outputted 100 rows , stage2's task(which writes into the DB) outputted 30rows. So just adding them up naively would result 130 rows instead of 30. – beatrice Feb 14 '22 at 15:26