1

If have a DataFrame and want to do some manipulation of the data in a function depending on the values of the row.

my_udf(row):
    threshold = 10
        if row.val_x > threshold
        row.val_x = another_function(row.val_x)
        row.val_y = another_function(row.val_y)
        return row
    else:
        return row

Does anyone know how to apply my udf to the DataFrame?

Lukas Müller
  • 101
  • 2
  • 12

2 Answers2

1

From my understanding, udf parameters are column names. Your example might be rewrote like this:

from pyspark.sql.functions import udf, array
from pyspark.sql.types import IntegerType

def change_val_x(val_x):
    threshold = 10
    if val_x > threshold:
        return another_function(val_x)
    else:
        return val_x

def change_val_y(arr):
    threshold = 10
    # arr[0] -> val_x, arr[0] -> val_y 
    if arr[0] > threshold:
        return another_function(arr[1])
    else:
        return val_y

change_val_x_udf = udf(change_val_x, IntegerType())
change_val_y_udf = udf(change_val_y, IntegerType())

# apply these functions to your dataframe
df = df.withColumn('val_y', change_val_y_udf(array('val_x', 'val_y')))\
       .withColumn('val_x', change_val_x_udf('val_x'))

To modify val_x column, a simple udf is enough but for val_y you need val_y and val_x columns values, the solution is to use an array. Note that this code is not tested...

See this question to apply udf on multiple columns.

Mederic Fourmy
  • 321
  • 1
  • 2
  • 11
  • 1
    it's `.withColumn('val_y', change_val_y_udf(array('val_x', 'val_y')))` not `.withColumn('val_y', change_val_x_udf(array('val_x', 'val_y')))` – MaFF Aug 22 '17 at 14:57
  • 1
    Also you might have changed `val_x` value before using it in in `change_val_y_udf `. – MaFF Aug 22 '17 at 15:07
1

It's better not to use UDFs if you can use pyspark functions, if you can't translate another_function into pyspark functions you can do this:

from pyspark.sql.types import *
import pyspark.sql.functions as psf

def another_function(val):
    ...

another_function_udf = psf.udf(another_function, [outputType()])

where outputType() is the pyspark type corresponding to the output of another_function (IntegerType(), StringType()...)

def apply_another_function(val):
    return psf.when(df.val_x > threshold, another_function_udf(val)).otherwise(val)

df = df.withColumn('val_y', apply_another_function(df.val_y))\
       .withColumn('val_x', apply_another_function(df.val_x))
MaFF
  • 9,551
  • 2
  • 32
  • 41