2

I am trying to store a DenseVector into a DataFrame in a new column.

I tried the following code, but got an AttributeError saying 'numpy.ndarray' object has no attribute '_get_object_id'.

from pyspark.sql import functions
from pyspark.mllib.linalg import Vectors

df = spark.createDataFrame([{'name': 'Alice', 'age': 1},
                            {'name': 'Bob', 'age': 2}])

vec = Vectors.dense([1.0, 3.0, 2.9])

df.withColumn('vector', functions.lit(vec))

I'm hoping to store a vector per row for computation purpose. Any help is appreciated.

[Python 3.7.3, Spark version 2.4.3, via Jupyter All-Spark-Notebook]

EDIT

I tried to follow the answer here as suggested by Florian, but I could not adapt the udf to take in a custom pre-constructed vector.

conv = functions.udf(lambda x: DenseVector(x), VectorUDT())
# Same with
# conv = functions.udf(lambda x: x, VectorUDT())

df.withColumn('vector', conv(vec)).show()

I get this error :

TypeError: Invalid argument, not a string or column: [1.0,3.0,2.9] of type <class 'pyspark.mllib.linalg.DenseVector'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
cylim
  • 542
  • 1
  • 6
  • 15
  • Possible duplicate of [Adding a Vectors Column to a pyspark DataFrame](https://stackoverflow.com/questions/49832877/adding-a-vectors-column-to-a-pyspark-dataframe). Does the answer there help you? – Florian Aug 16 '19 at 09:47
  • Thanks @Florian. Any idea how to modify the udf so that I can pass in my own vector? I followed the answer there and tried `udf(lambda x: x, VectorUDT())`, but it didn't work. – cylim Aug 16 '19 at 10:15
  • thi seems to work - `df.withColumn('vector', functions.array([functions.lit(k) for k in vec]))` – samkart Aug 16 '19 at 11:11

2 Answers2

2

You could wrap the creation of the udf inside a function, so it returns the udf with your vector. An example is given below, hope this helps!

import pyspark.sql.functions as F
from pyspark.ml.linalg import VectorUDT, DenseVector

df = spark.createDataFrame([{'name': 'Alice', 'age': 1},
                            {'name': 'Bob', 'age': 2}])

def vector_column(x): 
    return F.udf(lambda: x, VectorUDT())()

vec = DenseVector([1.0, 3.0, 2.9])
df.withColumn("vector", vector_column(vec)).show()

Output:

+---+-----+-------------+
|age| name|       vector|
+---+-----+-------------+
|  1|Alice|[1.0,3.0,2.9]|
|  2|  Bob|[1.0,3.0,2.9]|
+---+-----+-------------+
Florian
  • 24,425
  • 4
  • 49
  • 80
  • Thanks @Florian. Can confirm that this works. Could you explain why sometimes wrapping a udf inside a function is necessary? Or is this the recommended way to use a udf? – cylim Aug 16 '19 at 11:38
  • 1
    Well strictly speaking, you do not need to wrap the udf inside a function here, `df.withColumn("vector", F.udf(lambda: DenseVector([1.0, 3.0, 2.9]), VectorUDT())())` would also work. For reusability however, I find it cleaner to put this statement in a function that returns the UDF for us. – Florian Aug 16 '19 at 12:41
0

Another way get the same effect without using UDFs is to wrap the DenseVector in a Dataframe and apply a cartesian product operation:

import pyspark.sql.functions as F
from pyspark.ml.linalg import DenseVector

df = spark.createDataFrame([{'name': 'Alice', 'age': 1},
                            {'name': 'Bob', 'age': 2}])

df2 = spark.createDataFrame([{'vec' : DenseVector([1.0, 3.0, 2.9])}])
df.crossJoin(df2).show()
+---+-----+-------------+
|age| name|          vec|
+---+-----+-------------+
|  1|Alice|[1.0,3.0,2.9]|
|  2|  Bob|[1.0,3.0,2.9]|
+---+-----+-------------+
Mahmoud
  • 9,729
  • 1
  • 36
  • 47