5

Could someone please help me understand the behaviour of appending map functions to an RDD in a python for loop?

For the following code:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

def appender(l, i):
    return l + [i]

for i in range(3):
    rdd = rdd.map(lambda x: appender(x, i))

rdd.collect()

I get the output:

[[1, 2, 2, 2], [2, 2, 2, 2], [3, 2, 2, 2]]

Whereas with the following code:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

def appender(l, i):
    return l + [i]

rdd = rdd.map(lambda x: appender(x, 1))
rdd = rdd.map(lambda x: appender(x, 2))
rdd = rdd.map(lambda x: appender(x, 3))

rdd.collect()

I get the expected output:

[[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]

I imagine this has something to do with the closure that is passed to the PySpark compiler, but I can't find any documentation about this...

MarkNS
  • 3,811
  • 2
  • 43
  • 60

2 Answers2

2

The solution is to store all global variables (in this case i) in the lambda function to ensure proper closure. This can be accomplished by

for i in range(3):
    rdd = rdd.map(lambda x, i=i: appender(x, i))

More information about this can be found at lambda function accessing outside variable.

Interestingly, at least on a local cluster (have not tested on distributed clusters), the problem can also be addressed by persisting the intermediate rdd:

for i in range(3):
    rdd = rdd.map(lambda x: appender(x, i))
    rdd.persist()

both solutions produce

[[1, 0, 1, 2], [2, 0, 1, 2], [3, 0, 1, 2]] 
alta
  • 353
  • 1
  • 8
1

My best guess is because of lazy evaluation: And also You had a bad range.

this two code snippets results in the same output:

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

def appender(l, i):
    return l + [i]

for i in range(1,4):
    rdd = spark.sparkContext.parallelize(rdd.map(lambda x: appender(x, i)).collect())

rdd.collect()

outputs:

[[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]

and second one :

rdd = spark.sparkContext.parallelize([[1], [2], [3]])

rdd = rdd.map(lambda x: appender(x, 1))
rdd = rdd.map(lambda x: appender(x, 2))
rdd = rdd.map(lambda x: appender(x, 3))

rdd.collect()

outputs:

[[1, 1, 2, 3], [2, 1, 2, 3], [3, 1, 2, 3]]

Also, to show what happens in for loop in simplified example ( only inputs 1 and 2 ) with modified appender function to print l argument:

  1. for loop prints :

    [2]
    [2, 2]
    [1]
    [3]
    [1, 2]
    [3, 2]
    

as firstly it gets second field from input list

  1. explicit writing of mappers output is:

    [1]
    [1, 1]
    [2]
    [2, 1]
    [3]
    [3, 1]
    
Konrad Kostrzewa
  • 825
  • 7
  • 16
  • Hmm, nothing wrong with that range from my (and my python interpreter's) perspective. https://docs.python.org/2/library/functions.html#range – MarkNS Jun 28 '17 at 14:35
  • Parallelizing the rdd.map function is certainly not what I want to do either. Parallelize should be used to distribute an existing collection over the cluster. Bear in mind that this is just test pseudo-code. – MarkNS Jun 28 '17 at 14:39
  • ` Python 2.7.12 >>> for i in range(3): ... print(i) 0 1 2 ` and in second snippet You put as input numbers : 1,2,3 – Konrad Kostrzewa Jun 28 '17 at 14:40