1

I am trying to implement K-means from scratch using pyspark. I am performing various operations on rdd's but when i try to display the result of the final processed rdd, some error like "Pipelined RDD's cant be iterated" or something like that and things like .collect() do not work again because of the piplined rdd issue.

from __future__ import print_function
import sys
import numpy as np
def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex

data=SC.parallelize([1, 2, 3,5,7,3,5,7,3,6,4,66,33,66,22,55,77])

K = 3
convergeDist = float(0.1)

kPoints = data.takeSample(False, K, 1)
tempDist = 1.0

while tempDist > convergeDist:
    closest = data.map(
        lambda p: (closestPoint(p, kPoints), (p, 1)))



    pointStats = closest.reduceByKey(
        lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))

    newPoints = pointStats.map(
        lambda st: (st[0], st[1][0] / st[1][1]))
    print(newPoints)


    tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints).collect()

       # tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)




    for (iK, p) in newPoints:
        kPoints[iK] = p

print("Final centers: " + str(kPoints))

The error I am getting is:

TypeError: 'PipelinedRDD' object is not iterable

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Ray92
  • 439
  • 6
  • 18
  • You're mixing Spark and Numpy. Pick just one – OneCricketeer Dec 09 '17 at 23:32
  • `kPoints.collect()` will work fine. I don't understand your point about it being an issue – OneCricketeer Dec 09 '17 at 23:34
  • This line seems to be the problem: tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints).collect() ...Apparently you can not use a for loop/iterate through newPoints(which is a pipelined RDD) – Ray92 Dec 10 '17 at 01:16
  • Things which would normally work on a regular RDD, like a simple collect call do not seem to work for say the newPoints pipelined RDD. – Ray92 Dec 10 '17 at 01:52

1 Answers1

2

You cannot iterate over an RDD, you need first to call an action to get your data back to the driver. Quick sample:

>>> test = sc.parallelize([1,2,3])
>>> for i in test:
...    print i
... 
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'RDD' object is not iterable

This will not work because test is an RDD. On the other hand, if you bring your data back to the Driver with an action, now it will be an object over which you can iterate, for example:

>>> for i in test.collect():
...    print i
1                                                                               
2
3

There you go, call an action and bring the data back to the Driver, being careful of not having too much data or you can get an out of memory exception

xmorera
  • 1,933
  • 3
  • 20
  • 35