0

Is there a way to use a DataFrame inside a UDF (or an UDF alternative)? There are some functions that are used alongside the project to realize some conversions

This is a brief example of the functions and how I would like to use them

def _convertUnit(product, unitOrigin, unitDestination, quantity):
    #Query a DataFrame to get conversion factor
    return quantity * conversionFactor

def _convertUnitUME(product, unit, quantity):
    #Query a DataFrame by product to get destination unit
    return _convertUnit(product, unit, destinationUnit, quantity)

convertUnitUME = udf(_convertUnitUME)
#--------

myDF = otherDF.select(
    col("somecol"),
    convertUnitUME(col("productID"), col("unitID"), col("quantity")).alias("conversion")
)

I have tried wrapping the function in a lambda and broadcasting the DataFrames and it works as long as I don't call the second function from within the first one


def _convertUnit(df2, product, unitOrigin, unitDestination, quantity):
    #Query df2 to get conversion factor
    return quantity * conversionFactor

def _convertUnitUME(df1, df2, product, unit, quantity):
    #Query a df1 by product to get destination unit
    return _convertUnit(df2 product, unit, destinationUnit, quantity)

def convertUnitUME(df1 df2):
    #broadcast the df and return lambda
    _df1 = _sparkSession.sparkContext.broadcast(df1.collect())
    _df2 = _sparkSession.sparkContext.broadcast(df2.collect())
    return udf(lambda idP, idU, q: _convertUnitUME(_df1.value, _df2.value, idP, idU, q)

#-------
myDF = otherDF.select(
    col("somecol"),
    convertUnitUME(df1, df2)(col("productID"), col("unitID"), col("quantity")).alias("conversion") #Note the double parenthesis func(arg1)(arg2)
)
  • Spark is known to struggle with UDFs calling other functions. https://stackoverflow.com/questions/55688664/calling-another-custom-python-function-from-pyspark-udf Perhaps the only way is to nest your function within the function that calls it. – mck Nov 05 '20 at 14:30
  • You can broadcast the dataframe (it needs to be small) and then use it inside your udf. here's an example. https://stackoverflow.com/questions/53041860/using-broadcasted-dataframe-in-pyspark-udf – user238607 Nov 05 '20 at 17:46

0 Answers0