16

The PySpark documentation describes two functions:

mapPartitions(f, preservesPartitioning=False)

   Return a new RDD by applying a function to each partition of this RDD.

   >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
   >>> def f(iterator): yield sum(iterator)
   >>> rdd.mapPartitions(f).collect()
   [3, 7]

And ...

mapPartitionsWithIndex(f, preservesPartitioning=False)

   Return a new RDD by applying a function to each partition of this RDD, 
   while tracking the index of the original partition.

   >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
   >>> def f(splitIndex, iterator): yield splitIndex
   >>> rdd.mapPartitionsWithIndex(f).sum()
   6

What use cases do these functions attempt to solve? I can't see why they would be required.

Chris Snow
  • 23,813
  • 35
  • 144
  • 309
  • As the documentation says, the second function `mapPartitionsWithIndex` has two arguments instead of one, besides the first argument is the index, so it may be useful in several cases. – Alberto Bonsanto Nov 11 '15 at 17:17
  • 1
    Generally speaking these are useful when you want to access more than one observation at the time. Usually it means either ordered RDD or partitioned using specific partitioner. Some simple usages examples: http://stackoverflow.com/a/31686744/1560062, http://stackoverflow.com/a/33622083/1560062, http://stackoverflow.com/a/33588287/1560062. Pretty much every common operation in Spark is implemented using one mapPartitions / mapPartitionsWithIndex – zero323 Nov 11 '15 at 17:27
  • 2
    Pretty much every time you go beyond simple `map` - `filter` - `reduce` on of these comes handy. Since partition is a basic unit of concurrency in Spark you can apply arbitrary operation in parallel. These are also useful when you communicate with external systems, especially when you asynchronous processing. – zero323 Nov 11 '15 at 17:46

1 Answers1

40

To answer this question we need to compare map with mapPartitions/mapPartitionsWithIndex (mapPartitions and mapPartitionsWithIndex pretty much do the same thing except with mapPartitionsWithIndex you can track which partition is being processed).

Now mapPartitions and mapPartitionsWithIndex are used to optimize the performance of your application. Just for the sake of understanding let's say all the elements in your RDD are XML elements and you need a parser to process each of them. So you have to take an instance of a good parser class to move ahead with. You could do it in two ways:

map + foreach: In this case for each element, an instance of the parser class will be created, the element will be processed and then the instance will be destroyed in time but this instance will not be used for other elements. So if you are working with an RDD of 12 elements distributed among 4 partitions, the parser instance will be created 12 times. And as you know creating an instance is a very expensive operation so it will take time.

mapPartitions/mapPartitionsWithIndex: These two methods are able to address the above situation a little bit. mapPartitions/mapPartitionsWithIndex works on the partitions, not on the elements (please don't get me wrong, all elements will be processed). These methods will create the parser instance once for each partition. And as you have only 4 partitions, the parser instance will be created 4 times (for this example 8 times less than map). But the function you will pass to these methods should take an Iterator object (to take all the elements of a partition at once as input). So in case of mapPartitions and mapPartitionsWithIndex the parser instance will be created, all elements for the current partition will be processed, and then the instance will be destroyed later by GC. And you will notice that they can improve the performance of your application significantly.

So the bottom-line is, whenever you see that some operations are common to all elements, and in general, you could do it once and could process all of them, it's better to go with mapPartitions/mapPartitionsWithIndex.

Please find the below two links for explanations with code example: https://bzhangusc.wordpress.com/2014/06/19/optimize-map-performamce-with-mappartitions/ http://apachesparkbook.blogspot.in/2015/11/mappartition-example.html

Mrinal
  • 1,826
  • 2
  • 19
  • 31
  • 2
    what about **`map`** + [**`foreachPartition`**](https://stackoverflow.com/questions/30484701/apache-spark-foreach-vs-foreachpartitions-when-to-use-what)? Won't this only instantiate `n_partitions` number of parsers, rather than `n_total_elements`? – ijoseph Dec 27 '19 at 19:43