10

I'm trying to run a Spark-based application on an Azure HDInsight on-demand cluster, and am seeing lots of SparkExceptions (caused by ConcurrentModificationExceptions) being logged. The application runs without these errors when I start a local Spark instance.

I've seen reports of similar errors when using accumulators and my code is indeed using a CollectionAccumulator, however I have placed synchronized blocks everywhere I use it, and it makes no difference. The accumulator-related code looks like this:

class MySparkClass(sc : SparkContext) {
    val myAccumulator = sc.collectionAccumulator[MyRecord]

    override def add(record: MyRecord) = {
        synchronized {
            myAccumulator.add(record)
        }
    }

    override def endOfBatch() = {
        synchronized {
            myAccumulator.value.asScala.foreach((record: MyRecord) => {
                processIt(record)
            })
        }
    }
}

The exceptions don't cause the application to fail, however when endOfBatch is called and the code tries to read values out of the accumulator it is empty and processIt is never called.

We are using HDInsight version 3.6 with Spark version 2.3.0

18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList.writeObject(ArrayList.java:770)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:565)
    at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:231)
    at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
    ... 13 more

The following code is a more self-contained example that reproduces the problem. MyRecord is a simple case class containing only numeric values. The code runs without error locally, but on an HDInsight cluster it produces the error above.

object MainDemo {
    def main(args: Array[String]) {
        val sparkContext = SparkSession.builder.master("local[4]").getOrCreate().sparkContext
        val myAccumulator = sparkContext.collectionAccumulator[MyRecord]

        sparkContext.binaryFiles("/my/files/here").foreach(_ => {
            for(i <- 1 to 100000) {
                val record = MyRecord(i, 0, 0)
                myAccumulator.add(record)
            }
        })

        myAccumulator.value.asScala.foreach((record: MyRecord) => {
            // we expect this to be called once for each record that we 'add' above,
            // but it is never called
            println(record)
        })
    }
}
Oli
  • 9,766
  • 5
  • 25
  • 46
codebox
  • 19,927
  • 9
  • 63
  • 81
  • This might sound like a stupid suggestion but I'm not entire convinced that this error comes from your accumulator. So there are two tests I'd like to suggest: 1) make a fake run of your job with `MySparkClass` modified in such a way that all the (fake) data is pre-added to your accumulator at the constructor and then it is never modified by `add`. 2) make a test run with a code modified to use copy-on-write logic: make `myAccumulator` **`var`** instead of `val` and used `copy`-`add`-assign cycle instead of just `add`. This is 100% thread safe but very slow. I bet errors will be still there. – SergGr Nov 29 '18 at 21:46
  • link to bug report raised on Apache site: https://issues.apache.org/jira/browse/SPARK-26183 – codebox Jun 21 '19 at 10:10

2 Answers2

1

I doubt if having synchronized block really helps. CustomeAccumulators or all other accumulator are not thread-safe. They do not really have to since the DAGScheduler.updateAccumulators method that the spark driver uses to update the values of accumulators after a task completes (successfully or with a failure) is only executed on a single thread that runs scheduling loop. Besides that, they are write-only data structures for workers that have their own local accumulator reference whereas accessing the value of an accumulator is only allowed by the driver. And when you say that it works in local mode because it is single JVM but in cluster mode, they are different JVM and java instance, PRC calls are being triggered to enable the communication.

How your MyRecord object looks like and if you just end your line with .value rather having an iterator over it will help. Just try.

myAccumulator.value
H Roy
  • 597
  • 5
  • 10
  • The `add` method is called from Executor code and `endOfBatch` is called from driver code. I'm not sure what you are asking re local mode, but when I run the app on my local PC it works, using master url of `local[4]` - if you need further info just ask. – codebox Nov 29 '18 at 14:04
  • I need to iterate over the values that have been added to the accumulator, if I make the edit you suggest then how will `processIt` get called? The `MyRecord` class is a simple case class with various numeric and boolean fields – codebox Nov 29 '18 at 15:19
  • Can you show us the code that uses your class (or at least a minimal example that reproduces the error)? – Oli Nov 30 '18 at 07:07
  • @Oli I've added an example to the question – codebox Nov 30 '18 at 10:04
  • That's strange... I managed to run your code locally as well and got the expected behavior. Out of curiosity, why are you using an accumulator to collect that much data to the driver? – Oli Dec 02 '18 at 10:45
-1

It makes sense to read the accumulator only after some action on the RDD has been called (collect or count).

Also you don't need to synchronize on the accumulator since an independent copy of it will be allocated per partition.

simpadjo
  • 3,947
  • 1
  • 13
  • 38