52

I want to use an accumulator to gather some stats about the data I'm manipulating on a Spark job. Ideally, I would do that while the job computes the required transformations, but since Spark would re-compute tasks on different cases the accumulators would not reflect true metrics. Here is how the documentation describes this:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

This is confusing since most actions do not allow running custom code (where accumulators can be used), they mostly take the results from previous transformations (lazily). The documentation also shows this:

val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.

But if we add data.count() at the end, would this be guaranteed to be correct (have no duplicates) or not? Clearly acc is not used "inside actions only", as map is a transformation. So it should not be guaranteed.

On the other hand, discussion on related Jira tickets talk about "result tasks" rather than "actions". For instance here and here. This seems to indicate that the result would indeed be guaranteed to be correct, since we are using acc immediately before and action and thus should be computed as a single stage.

I'm guessing that this concept of a "result task" has to do with the type of operations involved, being the last one that includes an action, like in this example, which shows how several operations are divided into stages (in magenta, image taken from here):

A job dividing several operations into multiple purple stages

So hypothetically, a count() action at the end of that chain would be part of the same final stage, and I would be guaranteed that accumulators used on the last map will no include any duplicates?

Clarification around this issue would be great! Thanks.

Patsy Issa
  • 11,113
  • 4
  • 55
  • 74
Daniel Langdon
  • 5,899
  • 4
  • 28
  • 48
  • Well, bounty period ended and I still don't really know the true answer, so awarding it to the highest-commented answer so far :-S – Daniel Langdon Apr 22 '15 at 15:48
  • data.count will not run data.map(...) but this will do >>>val data2 = data.map(x => acc += x; f(x)) >>>data2.count() – Ajay Gupta Oct 04 '15 at 15:19

3 Answers3

29

To answer the question "When are accumulators truly reliable ?"

Answer : When they are present in an Action operation.

As per the documentation in Action Task, even if any restarted tasks are present it will update Accumulator only once.

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

And Action do allow to run custom code.

For Ex.

val accNotEmpty = sc.accumulator(0)
ip.foreach(x=>{
  if(x!=""){
    accNotEmpty += 1
  }
})

But, Why Map+Action viz. Result Task operations are not reliable for an Accumulator operation?

  1. Task failed due to some exception in code. Spark will try 4 times(default number of tries).If task fail every time it will give an exception.If by chance it succeeds then Spark will continue and just update the accumulator value for successful state and failed states accumulator values are ignored.
    Verdict : Handled Properly
  2. Stage Failure : If an executor node crashes, no fault of user but an hardware failure - And if the node goes down in shuffle stage.As shuffle output is stored locally, if a node goes down, that shuffle output is gone.So Spark goes back to the stage that generated the shuffle output, looks at which tasks need to be rerun, and executes them on one of the nodes that is still alive.After we regenerate the missing shuffle output, the stage which generated the map output has executed some of it’s tasks multiple times.Spark counts accumulator updates from all of them.
    Verdict : Not handled in Result Task.Accumulator will give wrong output.
  3. If a task is running slow then, Spark can launch a speculative copy of that task on another node.
    Verdict : Not handled.Accumulator will give wrong output.
  4. RDD which is cached is huge and can't reside in Memory.So whenever the RDD is used it will re run the Map operation to get the RDD and again accumulator will be updated by it.
    Verdict : Not handled.Accumulator will give wrong output.

So it may happen same function may run multiple time on same data.So Spark does not provide any guarantee for accumulator getting updated because of the Map operation.

So it is better to use Accumulator in Action operation in Spark.

To know more about Accumulator and its issues refer this Blog Post - By Imran Rashid.

Ajay Gupta
  • 3,192
  • 1
  • 22
  • 30
  • 1
    Hello! You are basically quoting the same I had in the question itself and @Daniel Daravos answer, but I don't think that is the whole picture, given how the documentation seem to be in conflict with itself and other expert analysis from cloudera. Are you a Spark code/design contributor or just figuring out from the outside like the rest of us? – Daniel Langdon Oct 04 '15 at 15:07
  • @DanielL. : No I am just a Spark User. But have been working on it for quite a long time. Actually I wanted to add the foreach() part for the answer as action can be made custom.So that in future if any OP comes they can understand the Accumulator well. – Ajay Gupta Oct 04 '15 at 15:20
  • Sure, but I think you are mistaken (and the docs are unclear). In fact, my current understanding is that accumulator are indeed reliable on a "result task" rather than an action. For instance, if you just do `sc.readFile(...).map(... ...).saveAsFile(...)` each task will be computed only once and accumulators will be reliable, as the entire operation will happen as a a single unit under lazy evaluation, with no intermediate results that might be rerun (speculatively or not). My experience reflect this so far, that's why I was looking for an authoritative answer. – Daniel Langdon Oct 04 '15 at 15:40
  • Yup, but if a executor node is failed while doing operation. Then I think the result will vary. And the reason why map + action = result should not be used. `val a = sc.readFile(...); val b = a.map(......);b.saveAsTextFile(...);val c = b.map(....);println(c.count());`.Now if you check the value it will double the accumulator original value.As that map is used twice for doing the operation. And if you have used accumulator in action.Then you will be sure that it will called once.And I am not taking the issue of cache over here as if RDD is evicted.It will rerun map again. – Ajay Gupta Oct 04 '15 at 15:56
  • But that is indeed my point, given that example, you can RELY on the accumulator being 2x :-) – Daniel Langdon Oct 05 '15 at 00:29
  • @DanielL. : Good [blog post](http://imranrashid.com/posts/Spark-Accumulators/) on Spark Accumulator By Imran Rashid – Ajay Gupta Oct 05 '15 at 03:41
  • @AjayGupta I think that @Daniell is right. In fact, in your last example, the counter is double because you execute two actions, so you evaluate the DAG two times, `save` and `count`. – angelcervera May 24 '18 at 14:11
22

Accumulator updates are sent back to the driver when a task is successfully completed. So your accumulator results are guaranteed to be correct when you are certain that each task will have been executed exactly once and each task did as you expected.

I prefer relying on reduce and aggregate instead of accumulators because it is fairly hard to enumerate all the ways tasks can be executed.

  • An action starts tasks.
  • If an action depends on an earlier stage and the results of that stage are not (fully) cached, then tasks from the earlier stage will be started.
  • Speculative execution starts duplicate tasks when a small number of slow tasks are detected.

That said, there are many simple cases where accumulators can be fully trusted.

val acc = sc.accumulator(0)
val rdd = sc.parallelize(1 to 10, 2)
val accumulating = rdd.map { x => acc += 1; x }
accumulating.count
assert(acc == 10)

Would this be guaranteed to be correct (have no duplicates)?

Yes, if speculative execution is disabled. The map and the count will be a single stage, so like you say, there is no way a task can be successfully executed more than once.

But an accumulator is updated as a side-effect. So you have to be very careful when thinking about how the code will be executed. Consider this instead of accumulating.count:

// Same setup as before.
accumulating.mapPartitions(p => Iterator(p.next)).collect
assert(acc == 2)

This will also create one task for each partition, and each task will be guaranteed to execute exactly once. But the code in map will not get executed on all elements, just the first one in each partition.

The accumulator is like a global variable. If you share a reference to the RDD that can increment the accumulator then other code (other threads) can cause it to increment too.

// Same setup as before.
val x = new X(accumulating) // We don't know what X does.
                            // It may trigger the calculation
                            // any number of times.
accumulating.count
assert(acc >= 10)
Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • I understand those cases. But how then do you explain the official docs: "For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value"??? Surely they have to be reliable in SOME case given this? I don't think "never" is the answer, it's probably more complex than that... – Daniel Langdon Apr 17 '15 at 12:53
  • 3
    You're right. I just tested it and accumulator updates from failed tasks are not counted. Then I suppose if you have speculative execution disabled, accumulators may be trustworthy as long as you're sure there is nothing that can trigger the calculation more than once. I'll take a closer look at the code. I may have to revise my deep distrust of accumulators :). – Daniel Darabos Apr 17 '15 at 13:58
  • You and me both! ;-) My look at the code left me with more questions than answers, but let me know if you do reach some conclusion. You get the bounty if you do hehe. – Daniel Langdon Apr 17 '15 at 15:03
  • Yah, I know the driver keeps track of failed tasks (retried) and can gain more coarse grained control over the updates. Also, I believe the reporting is not sent back to the driver until all of the worker tasks are done. I'll try to dig in more in a day or so, though – Justin Pihony Apr 18 '15 at 20:22
  • Thanks for the bounty! I've rewritten the answer after reading the code. I hope it's more deserving now. I'd still avoid accumulators, because it's so hard to reason about them in the general case. – Daniel Darabos Apr 27 '15 at 20:08
1

I think Matei answered this in the referred documentation:

As discussed on https://github.com/apache/spark/pull/2524 this is pretty hard to provide good semantics for in the general case (accumulator updates inside non-result stages), for the following reasons:

  • An RDD may be computed as part of multiple stages. For example, if you update an accumulator inside a MappedRDD and then shuffle it, that might be one stage. But if you then call map() again on the MappedRDD, and shuffle the result of that, you get a second stage where that map is pipeline. Do you want to count this accumulator update twice or not?

  • Entire stages may be resubmitted if shuffle files are deleted by the periodic cleaner or are lost due to a node failure, so anything that tracks RDDs would need to do so for long periods of time (as long as the RDD is referenceable in the user program), which would be pretty complicated to implement.

So I'm going to mark this as "won't fix" for now, except for the part for result stages done in SPARK-3628.

Community
  • 1
  • 1
Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
  • 1
    I did read that. But the definition of what constitutes a "result" stage is not clear. I used this text to compose the question and infer the alternatives, but I have no way to validate if the assumptions, inferences are correct. I was hoping for someone who understands the process to validate it. (and possibly explain a little further). Thanks though! – Daniel Langdon Apr 07 '15 at 16:42