2

I'd like to go through each row in a pyspark dataframe, and change the value of a column based on the content of another column. The value I am changing it to is also based on the current value of the column to be changed.

Specifically, I have a column which contains DenseVectors, and another column which contains the index of the vector that I need.

Alternatively, I could also replace the DenseVector with the larger of the two values in the DenseVector.

I am mainly trying to use F.when() in conjunction with withColumn, but I am running into trouble with the second element of F.when(), as I want to store the correct index of the vector, but cannot directly index on a column.

   a                        b  
1  DenseVector([0.1, 0.9])  1.0
2  DenseVector([0.6, 0.4])  0.0
.
.
.
df = df.withColumn('a', F.when(df.b == 0.0, df.a[0])
                  .otherwise(df.a[1])
bmarks2010
  • 31
  • 5
  • Can you please explain what do you mean by this: `I want to store the correct index of the vector, but cannot directly index on a column.`? – Jayram Kumar Apr 24 '19 at 20:01
  • @JayramKumar I want to store either the 0th, or 1st index of the vector in 'a', depending on whether that row is 1.0 or 0.0 in column 'b'. However, I cannot index over the vector in 'a' for a given row using something like `df.a[1]`, in the context of the code above. Based on the way `F.when` usually works, you'd expect to be able to index over the vector, but in this case you cannot. – bmarks2010 Apr 24 '19 at 20:45

1 Answers1

1

I was able to derive a solution by following the information found in the answer to this question.

It would seem there is a bug in Spark which doesn't allow you to index over DenseVectors contained in a Spark dataframe. This can be solved by creating a user defined function that accesses the elements like a numpy array.

from pyspark.sql import functions as F
from pyspark.sql import types as T

firstelement=F.udf(lambda v:float(v[0]),T.FloatType())
secondelement=F.udf(lambda v:float(v[1]),T.FloatType())


df = df.withColumn('a', F.when(df['b'] == 0.0, 
     firstelement('a')).otherwise(secondelement('a'))
bmarks2010
  • 31
  • 5