0

hello could someone explain to me why mapPartitions reacts differently to those two functions? (I have looked at this this thread and I don't think my problem comes from the fact that my iterable is TraversableOnce as I create it.

L=range(10)
J=range(5,15)
K=range(8,18)

data=J+K+L

def function_1(iter_listoflist):
    final_iterator=[]
    for sublist in iter_listoflist:
        final_iterator.append([x for x in sublist if x%9!=0])
    return iter(final_iterator)  

def function_2(iter_listoflist):
    final_iterator=[]
    listoflist=list(iter_listoflist)
    for i in range(len(listoflist)):
        for j in range(i+1,len(listoflist)):
            sublist=listoflist[i]+listoflist[j]
            final_iterator.append([x for x in sublist if x%9!=0])
            pass
        pass
    return iter(final_iterator)



sc.parallelize(data,3).glom().mapPartitions(function_1).collect()

returns what it should while

sc.parallelize(data,3).glom().mapPartitions(function_2).collect()

returns an empty array, I have the checked the code by returning a list at the end and it does what I want it to.

thanks for your help

Philippe C

Community
  • 1
  • 1
Philippe C
  • 667
  • 2
  • 9
  • 16

1 Answers1

2

It is actually quite simple. listoflist has always length equal to 1. To understand why that is the case you have to think about what is going on when call glom. To quote the docs it returns:

an RDD created by coalescing all elements within each partition into a list.

It means that when you call:

listoflist=list(iter_listoflist)

You get a list with a single element list containing all elements from that partition. Ignoring all the details:

(sc.parallelize(data, 3)
    .glom()
    .mapPartitionsWithIndex(lambda i, iter: [(i, list(iter))])
    .collect())

## [(0, [[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]]),
##     (1, [[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]]),
##     (2, [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]])]

It means that

  • range(len(listoflist)) contains a single element equal 0.
  • range(i+1,len(listoflist)) by substitution is an empty range(1, 1)

Hence there is nothing to do and you get an empty iterator.

On a side note all these pass statements as well as iter calls are completely obsolete.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • thank you for your answer and side note I'll take it into account. Is it of length 1 because it is called by mapPartition? When I did my tests it was of length the size of my list or iterator. – Philippe C Nov 05 '15 at 17:52
  • No, it because you're using `glom` – zero323 Nov 05 '15 at 18:07
  • Ok thank you very much, my work arround was to do the two for loops somewhere else and then call parallelize and it worked. If I don't use glom() I have an error message as I try to iterate on ints. is there a way to have the loop inside the function? I am basically trying to compute the lists in parallel but need that they don't mix. thanks again – Philippe C Nov 06 '15 at 07:44
  • Could you ask this a separate question and try to explain exactly what is the logic here? I have this strange feeling that code you've provided doesn't really reflect the expected output. – zero323 Nov 06 '15 at 08:06
  • sure here it is http://stackoverflow.com/questions/33562318/how-to-specify-the-partition-for-mappartition-in-spark – Philippe C Nov 06 '15 at 08:23