13

Starting with a Spark DataFrame to create a vector matrix for further analytics processing.

feature_matrix_vectors = feature_matrix1.map(lambda x: Vectors.dense(x)).cache()
feature_matrix_vectors.first()

The output is an array of vectors. Some of those vector have an null in them

>>> DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0])
...
>>> DenseVector([1.0, 1231.0, 15.0, 2008.0, null])

From this i want to iterate through the vector matrix and create an LabeledPoint array with 0 (zero) if the vector contains a null, otherwise with a 1.

def f(row):
    if row.contain(None):
       LabeledPoint(1.0,row)
    else:
       LabeledPoint(0.0,row)

I have tried to iterate through the vector matrix using

feature_matrix_labeledPoint = (f(row) for row in feature_matrix_vectors) #   create a generator of row sums
next(feature_matrix_labeledPoint) # Run the iteration protocol

but this doesn't work.

TypeError: 'PipelinedRDD' object is not iterable

Any help would be great

Eoin Lane
  • 641
  • 2
  • 6
  • 22

1 Answers1

9

RDDs are not a drop in replacement for a Python lists. You have to use either actions or transformations which are available on a given RDD. Here you can simply use map:

from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint


feature_matrix_vectors = sc.parallelize([
    DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]),
    DenseVector([1.0, 1231.0, 15.0, 2008.0, None])
])

(feature_matrix_vectors
    .map(lambda v: LabeledPoint(1.0 if None in v else 0.0, v))
    .collect())
zero323
  • 322,348
  • 103
  • 959
  • 935