4

I am trying to write an iterative algorithm with Spark. The algorithm contains one main loop in which different Spark commands for parallelism are used. If only one Spark command is used in each iteration everything works fine. When more than one command is used, the behaviour of Spark gets very strange. The main problem is that a map command on a RDD with 2 items does not result in 2, but in many many function calls.

It seems like Spark is executing in an iteration x every command from iteration 1 to iteration x-1 again. But not only in the last iteration of the loop, but in every single iteration of the loop!

I built a small example to reproduce the behaviour (With Java 1.8 and Spark 1.6.1)

At first the data structure that is used in the RDD:

public class Data implements Serializable {
    private static final long serialVersionUID = -6367920689454127925L;
    private String id;
    private Integer value;

    public Data(final String id, final Integer value) {
        super();
        this.id = id;
        this.value = value;
    }

    public String getId() {
        return this.id;
    }

    public Integer getValue() {
        return this.value;
    }

    public void setValue(final Integer value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Data [id=" + this.id + ", value=" + this.value + "]";
    }
}

For an max command we use a comparator:

public class MyComparator implements java.util.Comparator<Data>, Serializable {

    private static final long serialVersionUID = 1383816444011380318L;

    private static final double EPSILON = 0.001;

    public MyComparator() {
    }

    @Override
    public int compare(final Data x, final Data y) {
        if (Math.abs(x.getValue() - y.getValue()) < EPSILON) {
            return 0;
        } else if (x.getValue() < y.getValue()) {
            return -1;
        } else {
            return 1;
        }
    }

}

And now the main program with the algorithm:

public class Job implements Serializable {

    private static final long serialVersionUID = -1828983500553835114L;

    // Spark Settings
    private static final String APPNAME = "DebugApp - Main";
    private static final String SPARKMASTER = "local[1]";
    private static final int MAX_ITERATIONS = 4;

    public Job() {
    }

    public static void main(final String[] args) {
        final Job job = new Job();
        job.run();
    }

    public void run() {
        final JavaSparkContext sparkContext = createSparkContext();
        final List<Data> dataSet = new ArrayList<Data>();
        dataSet.add(new Data("0", 0));
        dataSet.add(new Data("1", 0));

        JavaRDD<Data> dataParallel = sparkContext.parallelize(dataSet);

        // We use an accumulator to count the number of calls within the map command
        final Accumulator<Integer> accum = sparkContext.accumulator(0);

        final MyComparator comparator = new MyComparator();
        for (int iterations = 0; iterations < MAX_ITERATIONS; iterations++) {
            // If the item which should be updated is selected using the iteration counter everything works fine...
            // final String idToUpdate = new Integer(iterations % 2).toString();

            // ..., but if the element with the minimal value is selected the number of executions in the map command increases.
            final String idToUpdate = dataParallel.min(comparator).getId();
            dataParallel = dataParallel.map(data -> {
                accum.add(1); // Counting the number of function calls.
                return updateData(data, idToUpdate);
            });
        }

        final List<Data> resultData = dataParallel.collect();
        System.out.println("Accumulator: " + accum.value());
        for (Data data : resultData) {
            System.out.println(data.toString());
        }
    }

    private Data updateData(final Data data, final String id) {
        if (data.getId().equals(id)) {
            data.setValue(data.getValue() + 1);
        }
        return data;
    }

    private JavaSparkContext createSparkContext() {
        final SparkConf conf = new SparkConf().setAppName(APPNAME).setMaster(SPARKMASTER);
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.set("spark.kryo.registrator", "de.eprofessional.bidmanager2.engine.serialization.KryoRegistratorWrapper");
        return new JavaSparkContext(conf);

    }
}

I would expect that for each iteration we obtain 2 function calls, which is the case if the item to update is selected by using the iteration counter (see Accumulator Result 1). But if the element is selected by using the min command, we obtain different results (See Accumulator Result 2):

+----------------+----------------------+----------------------+
| MAX_ITERATIONS | Accumulator Result 1 | Accumulator Result 2 |
+----------------+----------------------+----------------------+
|              1 |                    2 |                    2 |
|              2 |                    4 |                    6 |
|              3 |                    6 |                   12 |
|              4 |                    8 |                   20 |
+----------------+----------------------+----------------------+

Does someone have an explanation for the additional calls in the map command?

Michael Mior
  • 28,107
  • 9
  • 89
  • 113
Christian
  • 71
  • 1
  • 6
  • Really intriguing, but I cannot understand very well the difference in the code. Could you comment a little more? – Vale Jun 10 '16 at 12:55
  • The difference between the two accumulator results depends on the way how the element that should be updated is selected. If the element (or its id which is 0 or 1) is determined by `final String idToUpdate = new Integer(iterations % 2).toString();`the result is correct (Accumulator 1), because in each iteration there are 2 executions. If we use the min command on the rdd (` final String idToUpdate = dataParallel.min(comparator).getId();`) the rdd is recomputed from the beginning over and over again which results in too many function calls. – Christian Jun 10 '16 at 14:33

1 Answers1

8

Operations on RDDs defines what is called a "lineage". Each RDD has a reference to its parent (or parents, in case of e.g. a join). This lineage is visited when the RDD is materialized. That forms the basis of resiliency in RDDs: Spark can re-create all operations on a dataset to come to a result by executing said lineage on a given partition of data.

What's happening here is that we are chaining .map calls. If we unfold the loop, we would see something like:

iter1 -> rdd.map(f)
iter2 -> rdd.map(f).map(f) 
iter3 -> rdd.map(f).map(f).map(f)
...

We could see this by issuing a rdd.toDebugString within the loop.

So, bottom line: each pass will actually add a lineage step to the previous stage. If we would like to break that lineage, we should checkpoint the RDD at each iteration to 'remember' the last intermediate result. cache has a similar effect, except that it's not guaranteed that the evaluation stops (in case there's no more memory to cache). Hence, RDD materialization may further evaluate the lineage.

Michael Mior
  • 28,107
  • 9
  • 89
  • 113
maasg
  • 37,100
  • 11
  • 88
  • 115
  • Thank your very much. Your answer indeed helped to solve the posted problem by adding a `dataParallel.cache();` at the end of the for-loop. But I still do not understand why this is necessary and why Spark recomputes the rdd completely in each iteration. This would meen that I have to add a cache-command after each recomputation of a rdd so that it will not be recomputed from the beginning when I execute commands on the rdd. Still I find this very confusing and I have not found good documentations for this issue. Do you know any explanations? – Christian Jun 10 '16 at 14:21
  • @Christian Spark uses a _lazy avaluation_ which means it will only compute your RDDs when an **Action** is called ([refer to this](http://spark.apache.org/docs/latest/programming-guide.html#transformations)). Hence, the transformations are appended in the RDD lineage. If you use a cache for each transformation, you would fill your memory in no time, given enough data. Cache strategies usually depend on the time an RDD could take to compute and if a piece of code inside the transformation should be only run once per RDD (save to file, send in network...) – Vale Jun 10 '16 at 14:47
  • @Christian I forgot: refer to [this answer](http://stackoverflow.com/questions/28981359/why-do-we-need-to-call-cache-or-persist-on-a-rdd). – Vale Jun 10 '16 at 14:48
  • @Vale: Thanks. Now I understood the reason behind the lazy evaluation and it the accumulatorresult makes sense now and is not strange any more ;-). – Christian Jun 10 '16 at 15:09
  • @Vale *lazy evaluation* is indeed an important concept in this discussion. In interactive algos, the idea is to fan-out heavy computations and reduce to a certain set of indicator (score, slope, estimations, ...) then use that in the next iteration. The issue above is that the result of the map is not used within the loop. – maasg Jun 10 '16 at 15:09