13

In the currently early-release textbook titled High Performance Spark, the developers of Spark note that:

To allow Spark the flexibility to spill some records to disk, it is important to represent your functions inside of mapPartitions in such a way that your functions don’t force loading the entire partition in-memory (e.g. implicitly converting to a list). Iterators have many methods we can write functional style transformations on, or you can construct your own custom iterator. When a transformation directly takes and returns an iterator without forcing it through another collection, we call these iterator-to-iterator transformations.

However, the textbook lacks good examples using mapPartitions or similar variations of the method. And there's few good code examples existing online--most of which are Scala. For example, we see this Scala code using mapPartitions written by zero323 on How to add columns into org.apache.spark.sql.Row inside of mapPartitions.

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

Unfortunately, Java doesn't provide anything as nice as iter.map(...) for iterators. So it begs the question, how can one effectively use the iterator-to-iterator transformations with mapPartitions without entirely spilling an RDD to disk as a list?

JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
    ArrayList<OutObj> out = new ArrayList<>();
    while(iter.hasNext()) {
        InObj current = iter.next();
        out.add(someChange(current));
    }
    return out.iterator();
});

This seems to be the general syntax for using mapPartitions in Java examples, but I don't see how this would be the most efficient, supposing you have a JavaRDD with tens of thousands of records (or even more...since, Spark is for big data). You'd eventually end up with a list of all the objects in the iterator, just to turn it back into an iterator (which begs to say that a map function of some sort would be much more efficient here).

Note: while these 8 lines of code using mapPartitions could be written as 1 line with a map or flatMap, I'm intentionally using mapPartitions to take advantage of the fact that it operates over each partition rather than each element in the RDD.

Any ideas, please?

Community
  • 1
  • 1

1 Answers1

12

One way to prevent forcing the "materialization" of the entire partition is by converting the Iterator into a Stream, and then using Stream's functional API (e.g. map function).

How to convert an iterator to a stream? suggests a few good ways to convert an Iterator into a Stream, so taking one of the options suggested there we can end up with:

rdd.mapPartitions((Iterator<InObj> iter) -> {
    Iterable<InObj> iterable = () -> iter;
    return StreamSupport.stream(iterable.spliterator(), false)
            .map(s -> transformRow(s)) // or whatever transformation
            .iterator();
});

Which should be an "Itrator-to-Iterator" transformation, because all the intermediate APIs used (Iterable, Stream) are lazily evaluated.

EDIT: I haven't tested it myself, but the OP commented, and I quote, that "there is no efficiency increase by using a Stream over a list". I don't know why that is, and I don't know if that would be true in general, but worth mentioning.

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • This looks promising, I'm working on testing it now. Is there an easy way I could make it not one-to-one? In a current `mapPartitions` I have, I only added to the list if a boolean was true after I did some transformation. (It was to avoid a `filter` after chaining `mapPartitions` together.) I'm not sure what I could return in the map function to signal that it should be left out. –  Mar 01 '17 at 19:09
  • If I get this right - just add a `filter` after the map (that would be `Stream.filter`, not `RDD.filter`). – Tzach Zohar Mar 01 '17 at 19:13
  • Sure, that seems reasonable. –  Mar 01 '17 at 19:13
  • Since the boolean in `stream` is referring to a parameter labeled `parallel`, it might be worth setting this to `true` so that it's more compatible with Spark's API. I'm unsure if having it `false` will actually conflict with Spark trying to run in parallel or not. –  Mar 01 '17 at 19:43
  • Setting it to true sounds ill-advised to me: Spark assumes a 1-to-1 relationship between partitions and threads - each _task_ runs on a single _thread_ and handles a single _partition_. If your data is partitioned properly, you won't benefit from using more threads per partition, the cluster's threads should be utilized properly without it. – Tzach Zohar Mar 01 '17 at 19:57
  • 2
    Alright, so I have results for whoever stumbleth upon this thread in the future: using a `Stream` instead of a list is slightly slower. I'm assuming that the memory being used is better controlled, but there is no efficiency increase by using a `Stream` over a list. Thank you for the answer! –  Mar 01 '17 at 19:59
  • @user4728253 actually you want to set it to false. Spark does parallelization by distributing partitions across executor cores, but each partition is processed single-threaded. Setting it to true would actually go against Spark's APIs in this case, as that iterator contains all the items for a single partition and should not be parallelized. – Mohannad Mar 16 '20 at 04:30