3

I have a Spark Scala program which uses a REST API to get data batch by batch, and once all the data is retrieved I operate on them.

Current Program:

  • For each batch, create RDD and merge it with the previous RDD created using the previous API call rdd.union(currentRdd).

  • Operate on final RDD

A simple program to reproduce the issue:

    def main(args: Array[String]) = {
     val conf = new SparkConf().setAppName("Union test").setMaster("local[1]")
     val sc = new SparkContext(conf)
     val limit = 1000;
     var rdd = sc.emptyRDD[Int]
     for (x <- 1 to limit) {
       val currentRdd = sc.parallelize(x to x + 3)
       rdd = rdd.union(currentRdd)
     }
     println(rdd.sum())
   }

Problem: - When number of batches are high the program throws a StackOverflowError : Exception in thread "main" java.lang.StackOverflowError at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply

I assume, that when the number of batches increases the RDD dependency graph becomes really complex and throwing the error.

What is the best way to resolve this problem?

mjlowky
  • 1,183
  • 12
  • 19

1 Answers1

6

There is already SparkContext.union that knows how to properly compute a union of multiple RDDs:

val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = sc.union(rdds)

Alternatively, you could try using this helper function to avoid the creation of a long chain of unions:

val rdds = List.tabulate(limit + 1)(x => sc.parallelize(x to x + 3))
val rdd = balancedReduce(rdds)(_ union _)

The reason why it should work is essentially the same as in the linked answer: O(n) chain of unions blows the stack, O(log(n))-high binary tree of unions doesn't.

Andrey Tyukin
  • 43,673
  • 4
  • 57
  • 93