9

Here is the situation:

We have a module where we define some functions that return pyspark.sql.DataFrame (DF). To get those DF we use some pyspark.sql.functions.udf defined either in the same file or in helper modules. When we actually write job for pyspark to execute we only import functions from modules (we provide a .zip file to --py-files) and then just save the dataframe to hdfs.

Issue is that when we do this, the udf function freezes our job. The nasty fix we found was to define udf functions inside the job and provide them to imported functions from our module. The other fix I found here is to define a class:

from pyspark.sql.functions import udf


class Udf(object):
    def __init__(s, func, spark_type):
        s.func, s.spark_type = func, spark_type

    def __call__(s, *args):
        return udf(s.func, s.spark_type)(*args)

Then use this to define my udf in module. This works!

Can anybody explain why we have this problem in the first place? And why this fix (the last one with the class definition) works?

Additional info: PySpark 2.1.0. Deploying job on yarn in cluster mode.

Thanks!

Vaidas Armonas
  • 471
  • 4
  • 6

1 Answers1

2

The accepted answer to the link you posted above, says, "My work around is to avoid creating the UDF until Spark is running and hence there is an active SparkContext." Looks like your issue is with serializing the UDF.

Make sure the UDF functions in your helper classes are static methods or global functions. And inside the public functions that you import elsewhere, you can define the udf.

class Helperclass(object):
  @staticmethod
  def my_udf_todo(...):
     ...

  def public_function_that_is_imported_elsewhere(...):
     todo_udf = udf(Helperclass.my_udf_todo, RETURN_SCHEMA)
     ...
greenie
  • 409
  • 3
  • 6