6

I want to time my Spark program execution speed but due to laziness it's quite difficult. Let's take into account this (meaningless) code here:

var graph = GraphLoader.edgeListFile(context, args(0))
val graph_degs = graph.outerJoinVertices(graph.degrees).triplets.cache

/* I'd need to start the timer here */
val t1 = System.currentTimeMillis  
val edges = graph_degs.flatMap(trip =>  { /* do something*/ })
                      .union(graph_degs)

val count = edges.count
val t2 = System.currentTimeMillis 
/* I'd need to stop the timer here */

println("It took " + t2-t1 + " to count " + count)

The thing is, transformations are lazily so nothing gets evaluated before val count = edges.count line. But according to my point of view t1 gets a value despite the code above hasn't a value... the code above t1 gets evaluated after the timer started despite the position in the code. That's a problem...

In Spark Web UI I can't find anything interesting about it since I need the time spent after that specific line of code. Do you think is there a easy solution to see when a group of transformation gets evaluated for real?

Matt
  • 773
  • 2
  • 15
  • 32
  • Possible duplicate of [How to profile methods in Scala?](http://stackoverflow.com/questions/9160001/how-to-profile-methods-in-scala) – James Tobin Apr 20 '17 at 17:16
  • 1
    Doesn't seem like a duplicate as this post is specific to Apache Spark, which provides specific measurement tools and presents specific profiling challenges - like the one described here: evaluation is lazy and therefore evaluated block of code might not represent the measured operations. – Tzach Zohar Apr 20 '17 at 17:25

2 Answers2

5

Since consecutive transformations (within the same task - meaning, they are not separated by shuffles and performed as part of the same action) are performed as a single "step", Spark does not measure them individually. And from Driver code - you can't either.

What you can do is measure the duration of applying your function to each record, and use an Accumulator to sum it all up, e.g.:

// create accumulator
val durationAccumulator = sc.longAccumulator("flatMapDuration")

// "wrap" your "doSomething" operation with time measurement, and add to accumulator
val edges = rdd.flatMap(trip => {
  val t1 = System.currentTimeMillis
  val result = doSomething(trip)
  val t2 = System.currentTimeMillis
  durationAccumulator.add(t2 - t1)
  result
})

// perform the action that would trigger evaluation
val count = edges.count

// now you can read the accumulated value
println("It took " + durationAccumulator.value + " to flatMap " + count)

You can repeat this for any individual transformation.

Disclaimers:

  • Of course, this will not include the time Spark spent shuffling things around and doing the actual counting - for that, indeed, the Spark UI is your best resource.
  • Note that accumulators are sensitive to things like retries - a retried task will update the accumulator twice.

Style Note: You can make this code more reusable by creating a measure function that "wraps" around any function and updates a given accumulator:

// write this once:
def measure[T, R](action: T => R, acc: LongAccumulator): T => R = input => { 
  val t1 = System.currentTimeMillis
  val result = action(input)
  val t2 = System.currentTimeMillis
  acc.add(t2 - t1)
  result
}

// use it with any transformation:
rdd.flatMap(measure(doSomething, durationAccumulator))
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • 1
    Yea that's a good idea! But not so accurate because as you said "this will not include the time Spark spent shuffling things around and doing the actual counting". Interesting anyway. – Matt Apr 20 '17 at 17:59
  • I have the same problem but I want to measure a "joinWithCassandraTable" transformation, so I can't use accumulators inside of this method, any other alternative? – Guille May 13 '18 at 21:12
  • What is the difference between this approach and `spark.time(myFunction)`? – ScalaBoy May 31 '18 at 12:10
  • `spark.time(myFunction)` runs entirely on the driver, it can't be used for functions that transform the actual distributed data on the worker nodes. – Tzach Zohar May 31 '18 at 13:04
0

The Spark Web UI records every single action, and even reports times of every stage of that action - it's all in there! You need to looks through the stages tab, not the jobs. I've found it's only useable though if you compile and submit your code. It is useless in the REPL, are you using this by any chance?

Ben Horsburgh
  • 563
  • 4
  • 10