Is there a way to use a DataFrame inside a UDF (or an UDF alternative)? There are some functions that are used alongside the project to realize some conversions
This is a brief example of the functions and how I would like to use them
def _convertUnit(product, unitOrigin, unitDestination, quantity):
#Query a DataFrame to get conversion factor
return quantity * conversionFactor
def _convertUnitUME(product, unit, quantity):
#Query a DataFrame by product to get destination unit
return _convertUnit(product, unit, destinationUnit, quantity)
convertUnitUME = udf(_convertUnitUME)
#--------
myDF = otherDF.select(
col("somecol"),
convertUnitUME(col("productID"), col("unitID"), col("quantity")).alias("conversion")
)
I have tried wrapping the function in a lambda and broadcasting the DataFrames and it works as long as I don't call the second function from within the first one
def _convertUnit(df2, product, unitOrigin, unitDestination, quantity):
#Query df2 to get conversion factor
return quantity * conversionFactor
def _convertUnitUME(df1, df2, product, unit, quantity):
#Query a df1 by product to get destination unit
return _convertUnit(df2 product, unit, destinationUnit, quantity)
def convertUnitUME(df1 df2):
#broadcast the df and return lambda
_df1 = _sparkSession.sparkContext.broadcast(df1.collect())
_df2 = _sparkSession.sparkContext.broadcast(df2.collect())
return udf(lambda idP, idU, q: _convertUnitUME(_df1.value, _df2.value, idP, idU, q)
#-------
myDF = otherDF.select(
col("somecol"),
convertUnitUME(df1, df2)(col("productID"), col("unitID"), col("quantity")).alias("conversion") #Note the double parenthesis func(arg1)(arg2)
)