3

How can I get a specific value out of a Spark DenseVector that is stored in a DataFrame column into a new column in the same DataFrame without using a python user defined function (udf)?

More generally, how can I perform operations on vectors stored in a DataFrame column and put the results in a new column in the same DataFrame

The following should be reproducible.

spark = pyspark.sql.SparkSession.builder.getOrCreate()
from pyspark.mllib.linalg import DenseVector
import pyspark.sql.types as T

testdf = spark.createDataFrame([\
            (DenseVector([2, 3]),),\
            (DenseVector([4, 5]),),\
            (DenseVector([6, 7]),)],\
            ['DenseVectors'])

These work for single extractions.

testdf.collect()[0][0][1]
3.0

testdf.collect()[0][0].dot(DenseVector([0, 1]))   
3.0

But I cannot get those to work to create a new column.

testdf \
  .withColumn('test', testdf.DenseVectors[0][0][1]) \

> AnalysisException: u"Can't extract value from DenseVectors#211: need struct type but got vector;"


testdf \
  .withColumn('test', testdf.DenseVectors.dot(DenseVector([0, 1]))) \

> TypeError: 'Column' object is not callable
Clay
  • 2,584
  • 1
  • 28
  • 63
  • Please provide an example of your desired output, as what you ask is currently unclear (even if it worked, `testdf.withColumn('test', testdf.DenseVectors[0][0][1])` would fill a column with the constant value of `3.0`, which doesn't make much sense)... – desertnaut Jan 23 '18 at 20:44
  • @desertnaut I wanted to fill the new column with the second FloatType value of the DenseVector for each row. I wanted to do this without using the `df.rdd.map(func).toDF()` routine or a python udf. It does not look like that is possible. I used the former to complete the task, then drop the unused columns, then merge back to my original DataFrame on a key. – Clay Jan 23 '18 at 22:35
  • @desertnaut the first side note question could be moved to a new question. – Clay Jan 23 '18 at 22:37
  • indeed it should – desertnaut Jan 23 '18 at 22:37
  • @desertnaut the second side note question was because I was accidentally trying to complete `.dot` with a `pyspark.ml.linalg.DenseVector` and a `pyspark.mllib.linalg.DenseVector`. Once I made both the same type of `DenseVector`, the example I gave worked for both. – Clay Jan 23 '18 at 22:40
  • So, you want to avoid both RDD's *and* UDF's, and that's why the linked answer does not work for you? – desertnaut Jan 23 '18 at 22:41
  • 1
    @desertnaut the different types of `DenseVector`s were causing some trouble with that solution at first. Also, yes, I would like to have a way to do it directly to a DataFrame with out having to go to rdd and back, or use a python udf. Would I have to write a Scala function and then a pyspark wrapper for it? – Clay Jan 23 '18 at 22:51

0 Answers0