Here is the situation:
We have a module where we define some functions that return pyspark.sql.DataFrame
(DF). To get those DF we use some pyspark.sql.functions.udf
defined either in the same file or in helper modules. When we actually write job for pyspark to execute we only import functions from modules (we provide a .zip
file to --py-files
) and then just save the dataframe to hdfs.
Issue is that when we do this, the udf
function freezes our job. The nasty fix we found was to define udf
functions inside the job and provide them to imported functions from our module.
The other fix I found here is to define a class:
from pyspark.sql.functions import udf
class Udf(object):
def __init__(s, func, spark_type):
s.func, s.spark_type = func, spark_type
def __call__(s, *args):
return udf(s.func, s.spark_type)(*args)
Then use this to define my udf
in module. This works!
Can anybody explain why we have this problem in the first place? And why this fix (the last one with the class definition) works?
Additional info: PySpark 2.1.0. Deploying job on yarn in cluster mode.
Thanks!