0

In the following code, is the BLOCK 2 loop guaranteed to be executed only after all the executor tasks spawned by BLOCK 1 have finished, or is it possible for it to run while some of the executor tasks are still running?

If it is possible for the 2 blocks to run concurrently, what is the best way to prevent this? I need to process the contents of the accumulator, but only once all the executors have finished.

When running with a master url of local[4] as shown, it looks like BLOCK 2 waits for BLOCK 1 to finish, however I am seeing errors when running with a url of yarn which suggest that BLOCK 2 is running concurrently with the executor tasks in BLOCK 1

object Main {
    def main(args: Array[String]) {
        val sparkContext = SparkSession.builder.master("local[4]").getOrCreate().sparkContext
        val myRecordAccumulator = sparkContext.collectionAccumulator[MyRecord]

        // BLOCK 1
        sparkContext.binaryFiles("/my/files/here").foreach(_ => {
            for(i <- 1 to 100000) {
                val record = buildRecord()
                myRecordAccumulator.add(record)
            }
        })

        // BLOCK 2
        myRecordAccumulator.value.asScala.foreach((record: MyRecord) => {
            processIt(record)
        })
    }
}
codebox
  • 19,927
  • 9
  • 63
  • 81
  • 1
    I am pretty certain that BLOCK 2 will not be executed until all the records are processed by the accumulator. Could you share the errors you notice so that we can have a better understanding of what is going on? – Oli Nov 29 '18 at 16:24
  • _or is it possible for it to run while some of the executor tasks are still running?_ - it isn't `RDD.foreach` is a blocking methods, so your program won't proceed until it returns. If there is a problem, it can be in either in the missing parts (`buildRecord`, `processIt`) or very inefficient overall flow with all the data collected to the driver. – 10465355 Nov 29 '18 at 18:55
  • @Oli The actual error is detailed in this question: https://stackoverflow.com/questions/53484774/concurrentmodificationexception-when-using-spark-collectionaccumulator I thought this might be the cause – codebox Nov 29 '18 at 19:26
  • I met the same problem. Any solutions? – formath Sep 28 '22 at 08:22

0 Answers0