I'm trying to apply a custom function over rows in a pyspark dataframe. This function takes the row and 2 other vectors of the same dimension. It outputs the sum of the values of the third vector for each matching values from the row in the second vector.
import pandas as pd
import numpy as np
Function:
def V_sum(row,b,c):
return float(np.sum(c[row==b]))
What I want to achieve is simple with pandas:
pd_df = pd.DataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], columns=['t1', 't2', 't3', 't4'])
t1 t2 t3 t4
0 0 1 0 0
1 1 1 0 0
2 0 0 1 0
3 1 0 1 1
4 1 1 0 0
B = np.array([1,0,1,0])
V = np.array([5,1,2,4])
pd_df.apply(lambda x: V_sum(x, B, V), axis=1)
0 4.0
1 9.0
2 7.0
3 8.0
4 9.0
dtype: int64
I would like to perform the same action in pyspark.
from pyspark import SparkConf, SparkContext, SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)
spk_df = sqlContext.createDataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], ['t1', 't2', 't3', 't4'])
spk_df.show()
+---+---+---+---+
| t1| t2| t3| t4|
+---+---+---+---+
| 0| 1| 0| 0|
| 1| 1| 0| 0|
| 0| 0| 1| 0|
| 1| 0| 1| 1|
| 1| 1| 0| 0|
+---+---+---+---+
I thought about using udf but I can't get it to work
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
V_sum_udf = F.udf(V_sum, FloatType())
spk_df.select(V_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))).alias("results")).show()
Clearly I'm doing something wrong because it yields:
Py4JJavaError: An error occurred while calling o27726.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 90.0 failed 1 times, most recent failure: Lost task 0.0 in stage 90.0 (TID 91, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):