2

I have a class with a native python function (performing some imputations on a pd df) that will be used on grouped data with applyInPandas (https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html) in another class spark method like this:

from pyspark.sql.session import SparkSession

spark_df = spark.read.option("header", "true").parquet(f'{path}')

class MyClass():

    def transform(self,spark_df):
        self.spark = SparkSession.builder.getOrCreate()
        
        final_df = self._MySparkMethod(spark_df)
        
    @static method
    def _MyPandasImputerFunction(pdf: pd.DataFrame) -> pd.DataFrame:
        #
        # some pandas operations
        #
        return pandas_df
    
    def _MySparkMethod(self,spark_df):
    
        schema = t.StructType([
          t.StructField('col_1', t.StringType()),
          t.StructField('col_2', t.DoubleType())  
        ])
        
        spark_df.groupby("col_1")\
                .applyInPandas(self._MyPandasImputerFunction, schema)
                
MyClass_Instance = MyClass()
df_result = MyClass_Instance.transform(spark_df)

when I try to execute in databricks It works, but when I execute the script inside a vm in the build CI/CD pipeline It throws this error:

2023-03-29 10:24:56,845 ERROR [Executor task launch worker for task 1.0 in stage 121.0 (TID 125)] executor.Executor (Logging.scala:logError(94)) - Exception in task 1.0 in stage 121.0 (TID 125)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/worker.py", line 588, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/worker.py", line 421, in read_udfs
    arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0)
  File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
  File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/home/vsts/work/1/s/coffee_segmentation/operations/processes/ca/consolidate_features.py", line 16, in <module>
    from coffee_segmentation.utils.utils import load_json, check_for_duplicates, presenceDict
  File "/home/vsts/work/1/s/coffee_segmentation/utils/utils.py", line 17, in <module>
    'CCH w/ coffee machine': ((f.col('source') == 'customer_md') & (f.col('has_coffee_machine') == 'YES')),
  File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/sql/functions.py", line 106, in col
    return _invoke_function("col", col)
  File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/sql/functions.py", line 57, in _invoke_function
    jf = _get_get_jvm_function(name, SparkContext._active_spark_context)
  File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/sql/functions.py", line 49, in _get_get_jvm_function
    return getattr(sc._jvm.functions, name)
AttributeError: 'NoneType' object has no attribute '_jvm'

Searching the error online seems that the vm cant find the spark session.

I tried to define the spark instance also at the start of the script, outside of the class, but still I get the same error.

While debugging I tried to call the spark session inside _MySparkMethod reading a dict, creating a dataframe and printing it and it works just fine.

I'm not able to reproduce the same error in databricks.

I would like to stick to applyInPandas without using legacy apply that will be deprecated in the future if possible. If pandas_udf could solve the issue how could I implement them with a pandas dataframe as input of the udf?

Feary
  • 37
  • 6

0 Answers0