I am trying to implement K-means from scratch using pyspark. I am performing various operations on rdd's but when i try to display the result of the final processed rdd, some error like "Pipelined RDD's cant be iterated" or something like that and things like .collect() do not work again because of the piplined rdd issue.
from __future__ import print_function
import sys
import numpy as np
def closestPoint(p, centers):
bestIndex = 0
closest = float("+inf")
for i in range(len(centers)):
tempDist = np.sum((p - centers[i]) ** 2)
if tempDist < closest:
closest = tempDist
bestIndex = i
return bestIndex
data=SC.parallelize([1, 2, 3,5,7,3,5,7,3,6,4,66,33,66,22,55,77])
K = 3
convergeDist = float(0.1)
kPoints = data.takeSample(False, K, 1)
tempDist = 1.0
while tempDist > convergeDist:
closest = data.map(
lambda p: (closestPoint(p, kPoints), (p, 1)))
pointStats = closest.reduceByKey(
lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))
newPoints = pointStats.map(
lambda st: (st[0], st[1][0] / st[1][1]))
print(newPoints)
tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints).collect()
# tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)
for (iK, p) in newPoints:
kPoints[iK] = p
print("Final centers: " + str(kPoints))
The error I am getting is:
TypeError: 'PipelinedRDD' object is not iterable