7

I am getting this error but i do not know why. Basically I am erroring from this code:

    a = data.mapPartitions(helper(locations))

where data is an RDD and my helper is defined as:

    def helper(iterator, locations): 
        for x in iterator:
            c = locations[x]
            yield c

(locations is just an array of data points) I do not see what the problem is but I am also not the best at pyspark so can someone please tell me why I am getting 'PipelinedRDD' object is not iterable from this code?

deeformvp
  • 157
  • 1
  • 2
  • 11
  • 1
    You can not iterate on an rdd in the way you do. Pls have a look at http://stackoverflow.com/questions/25914789/how-do-i-iterate-rdds-in-apache-spark-scala – Mohan Apr 19 '16 at 10:52
  • @Mohan : thanks I think I get the idea now but I am still getting the same error. I am now calling this: a = data.mapPartitions(lambda iterator: helper(iterator, locations)). What else am I doing wrong? – deeformvp Apr 19 '16 at 21:40

2 Answers2

7

RDD can iterated by using map and lambda functions. I have iterated through Pipelined RDD using the below method

lines1 = sc.textFile("\..\file1.csv")
lines2 = sc.textFile("\..\file2.csv")

pairs1 = lines1.map(lambda s: (int(s), 'file1'))
pairs2 = lines2.map(lambda s: (int(s), 'file2'))

pair_result = pairs1.union(pairs2)

pair_result.reduceByKey(lambda a, b: a + ','+ b)

result = pair.map(lambda l: tuple(l[:1]) + tuple(l[1].split(',')))
result_ll = [list(elem) for elem in result]

===> result_ll = [list(elem) for elem in result]

TypeError: 'PipelinedRDD' object is not iterable

Instead of this I replaced the iteration using map function

result_ll = result.map( lambda elem: list(elem))

Hope this helps to modify your code accordingly

Yamur
  • 339
  • 6
  • 20
Aravind Krishnakumar
  • 2,727
  • 1
  • 28
  • 25
2

I prefer the answer that said in another question with below link : Can not access Pipelined Rdd in pyspark

You cannot iterate over an RDD, you need first to call an action to get your data back to the driver. Quick sample:

`>>> test = sc.parallelize([1,2,3])
 >>> for i in test:
     ...    print i
     ... 
     Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
     TypeError: 'RDD' object is not iterable`

but for example you can use '.collect()'

`>>> for i in test.collect():
     ...      print i
 1                                                                               
 2
 3`
Mohammad Rahmati
  • 81
  • 1
  • 1
  • 6