1

I'm trying to apply a custom function over rows in a pyspark dataframe. This function takes the row and 2 other vectors of the same dimension. It outputs the sum of the values of the third vector for each matching values from the row in the second vector.

import pandas as pd
import numpy as np

Function:

def V_sum(row,b,c):
    return float(np.sum(c[row==b]))

What I want to achieve is simple with pandas:

pd_df = pd.DataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], columns=['t1', 't2', 't3', 't4'])
   t1  t2  t3  t4
0   0   1   0   0
1   1   1   0   0
2   0   0   1   0
3   1   0   1   1
4   1   1   0   0

B = np.array([1,0,1,0])
V = np.array([5,1,2,4])

pd_df.apply(lambda x: V_sum(x, B, V), axis=1)
0    4.0
1    9.0
2    7.0
3    8.0
4    9.0
dtype: int64

I would like to perform the same action in pyspark.

from pyspark import SparkConf, SparkContext, SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)

spk_df = sqlContext.createDataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], ['t1', 't2', 't3', 't4'])
spk_df.show()
+---+---+---+---+
| t1| t2| t3| t4|
+---+---+---+---+
|  0|  1|  0|  0|
|  1|  1|  0|  0|
|  0|  0|  1|  0|
|  1|  0|  1|  1|
|  1|  1|  0|  0|
+---+---+---+---+

I thought about using udf but I can't get it to work

from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

V_sum_udf = F.udf(V_sum, FloatType()) 
spk_df.select(V_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))).alias("results")).show()

Clearly I'm doing something wrong because it yields:

Py4JJavaError: An error occurred while calling o27726.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 90.0 failed 1 times, most recent failure: Lost task 0.0 in stage 90.0 (TID 91, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
Haboryme
  • 4,611
  • 2
  • 18
  • 21

1 Answers1

1

If you've got non-column data that you want to use inside a function along with column data to compute a new column, a UDF + closure + withColumn as described here is a good place to start.

B = [2,0,1,0] 
V = [5,1,2,4]

v_sum_udf = F.udf(lambda row: V_sum(row, B, V), FloatType())
spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))))
Zooby
  • 325
  • 1
  • 7
  • thank you for your answer. I tried running your code, it doesn't throw any error until I try to display the results with show(). – Haboryme Dec 04 '17 at 07:44
  • 1
    Using spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns)))) gives me the correct result. Thanks for pointing me in the right direction. – Haboryme Dec 04 '17 at 08:04