35

So I am trying to learn Spark using Python (Pyspark). I want to know how the function mapPartitions work. That is what Input it takes and what Output it gives. I couldn't find any proper example from the internet. Lets say, I have an RDD object containing lists, such as below.

[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ] 

And I want to remove element 2 from all the lists, how would I achieve that using mapPartitions.

user4157124
  • 2,809
  • 13
  • 27
  • 42
MetallicPriest
  • 29,191
  • 52
  • 200
  • 356

4 Answers4

41

mapPartition should be thought of as a map operation over partitions and not over the elements of the partition. It's input is the set of current partitions its output will be another set of partitions.

The function you pass to map operation must take an individual element of your RDD

The function you pass to mapPartition must take an iterable of your RDD type and return an iterable of some other or the same type.

In your case you probably just want to do something like:

def filter_out_2(line):
    return [x for x in line if x != 2]

filtered_lists = data.map(filterOut2)

If you wanted to use mapPartition it would be:

def filter_out_2_from_partition(list_of_lists):
  final_iterator = []
  for sub_list in list_of_lists:
    final_iterator.append( [x for x in sub_list if x != 2])
  return iter(final_iterator)

filtered_lists = data.mapPartition(filterOut2FromPartion)
Bartosz Konieczny
  • 1,985
  • 12
  • 27
bearrito
  • 2,217
  • 1
  • 25
  • 36
  • Why don't you return anything in filterOut2FromPartition function. Secondly, is final some keyword in python? I think you meant to say final.iterator = [] instead of final_iterator. – MetallicPriest Nov 04 '14 at 21:39
  • I tried to implement this but I get the error "list object is not an iterator". Also, I think when you wrote [x for x in line if x != 2], I think you meant [x for x in list if x != 2]. I used list there. – MetallicPriest Nov 05 '14 at 10:27
  • 1
    We should return iter(final_iterator) instead of final_iterator. Fixed the answer. Thank you for your help :). – MetallicPriest Nov 05 '14 at 10:59
  • I'm newbie learning spark why below code ```rdd = sc.parallelize([1,2,3,4,5,6,7,8],2) def f(x): yield len(x) rdd.mapPartitions(f).collect()``` not working, – subro Sep 28 '19 at 11:42
  • Just for the record, you don't need to build the entire final iterator all at once for the filter_out_2_from_partition function. Your function could be rewritten as a one line generator statement: `return ( [x for x in sub_list if x != 2] for sub_list in list_of_lists)` If you do it that way, Spark can use its lazy loading to decide if/when to process each row within each partition. – Nolan Barth Feb 14 '22 at 21:42
30

It's easier to use mapPartitions with a generator function using the yield syntax:

def filter_out_2(partition):
    for element in partition:
        if element != 2:
            yield element

filtered_lists = data.mapPartitions(filter_out_2)
Narek
  • 548
  • 6
  • 26
  • Is this faster than just returning a list? – cgreen Jan 03 '17 at 22:05
  • 3
    @cgreen the partition contains all of your data. I'm not sure you want to load all of your data into a list. Generators are preferred over lists when you are iterating over data. – Narek Jan 03 '17 at 22:40
  • 1
    @cgreen Generators use less memory, since they generate each item as it's needed, instead of initially having to generate an entire list of objects. So it definitely uses less memory, and therefore is probably faster. [Here is a good explanation of generators in Python](https://medium.freecodecamp.org/python-list-comprehensions-vs-generator-expressions-cef70ccb49db). – Shane Halloran Nov 26 '17 at 22:52
  • This is really helpful but the method is called mapPartition**s** https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.mapPartitions – nadre Apr 15 '18 at 10:36
1

Need a final Iter

def filter_out_2(partition):
for element in partition:
    sec_iterator = []
    for i in element:
        if i!= 2:
            sec_iterator.append(i)
    yield sec_iterator

filtered_lists = data.mapPartitions(filter_out_2)
for i in filtered_lists.collect(): print(i)
Ravinder Karra
  • 307
  • 1
  • 3
  • 8
-1
     def func(l):
         for i in l:
             yield i+"ajbf"


     mylist=['madhu','sdgs','sjhf','mad']
     rdd=sc.parallelize(mylist)
     t=rdd.mapPartitions(func)
     for i in t.collect():
         print(i)
     for i in t.collect():
        print(i)

in the above code I am able get data from 2nd for..in loop.. as per generator it should not should values once its iterate over the loop