0

A beginner in pyspark trying to understand UDF:

I have a PySpark dataframe p_b, I am calling a UDF, by passing all rows of the dataframe. I want to access the column, debit from the row. For some reason, this is not happening. Please find the snippets below.

p_b has 4 columns, id, credit, debit,sum

Function:

def test(row): return('123'+row['debit'])

Converting to UDF

test_udf=udf(test,IntegerType())

Calling UDF on dataframe p_b

vals=test_udf(struct([p_b[x] for x in p_b.columns])) print(type(vals)) print(vals)

Output

Column<b'test(named_struct(id, credit,debit,sum))'>

Bartosz Konieczny
  • 1,985
  • 12
  • 27
pnv
  • 1,437
  • 3
  • 23
  • 52
  • It seems that you are trying to add '123' to every row to your dataframe. isn't? – vikrant rana Aug 16 '19 at 11:02
  • 1
    you have to call your udf for your dataframe using with column, dataframe column value has to be passed as an argument. Define your function like this. def user_func(row): return row+123 – vikrant rana Aug 16 '19 at 11:07
  • my_func = udf(user_func, IntegerType()) newdf = df.withColumn('new_column',my_func(df.value)) – vikrant rana Aug 16 '19 at 11:07
  • see for more details. https://stackoverflow.com/questions/57517381/how-do-i-use-multiple-conditions-with-pyspark-sql-funtions-when-from-a-dict/57518667#57518667 – vikrant rana Aug 16 '19 at 11:09
  • Thank you for the comments. I was trying to add '123' to all rows of column 'debit' – pnv Aug 16 '19 at 17:16

1 Answers1

1

Let first make a sample dataframe:

from pyspark.sql.functions import *
from pyspark.sql.types import *  
schema = StructType([StructField("id", StringType(), True),\
                               StructField("credit", IntegerType(), True),\
                       StructField("debit", IntegerType(), True),\
                     StructField("sum", IntegerType(), True)])
df = spark.createDataFrame([("user_10",100, 10,110),("user_11",200, 20,220),("user_12",300, 30,330) ], schema)
df.show()

which results in:

+-------+------+-----+---+
|     id|credit|debit|sum|
+-------+------+-----+---+
|user_10|   100|   10|110|
|user_11|   200|   20|220|
|user_12|   300|   30|330|
+-------+------+-----+---+

Now, lets define the udf that adds 123 to the values passed to it:

def test(x):
    return(123+x)
test_udf=udf(test,IntegerType())

And lets see how to use the UDF:

df2 = df.withColumn( 'debit' , test_udf(col('debit')) )
df2.show()

which results in:

+-------+------+-----+---+
|     id|credit|debit|sum|
+-------+------+-----+---+
|user_10|   100|  133|110|
|user_11|   200|  143|220|
|user_12|   300|  153|330|
+-------+------+-----+---+

Note that now you probably need to recalculate the "sum" column:

df2 = df2.withColumn( 'sum' ,  col('debit')+col('credit') )
df2.show()

which results in:

+-------+------+-----+---+
|     id|credit|debit|sum|
+-------+------+-----+---+
|user_10|   100|  133|233|
|user_11|   200|  143|343|
|user_12|   300|  153|453|
+-------+------+-----+---+
F4RZ4D
  • 115
  • 2
  • 7