I have a class with a native python function (performing some imputations on a pd df) that will be used on grouped data with applyInPandas (https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html) in another class spark method like this:
from pyspark.sql.session import SparkSession
spark_df = spark.read.option("header", "true").parquet(f'{path}')
class MyClass():
def transform(self,spark_df):
self.spark = SparkSession.builder.getOrCreate()
final_df = self._MySparkMethod(spark_df)
@static method
def _MyPandasImputerFunction(pdf: pd.DataFrame) -> pd.DataFrame:
#
# some pandas operations
#
return pandas_df
def _MySparkMethod(self,spark_df):
schema = t.StructType([
t.StructField('col_1', t.StringType()),
t.StructField('col_2', t.DoubleType())
])
spark_df.groupby("col_1")\
.applyInPandas(self._MyPandasImputerFunction, schema)
MyClass_Instance = MyClass()
df_result = MyClass_Instance.transform(spark_df)
when I try to execute in databricks It works, but when I execute the script inside a vm in the build CI/CD pipeline It throws this error:
2023-03-29 10:24:56,845 ERROR [Executor task launch worker for task 1.0 in stage 121.0 (TID 125)] executor.Executor (Logging.scala:logError(94)) - Exception in task 1.0 in stage 121.0 (TID 125)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/worker.py", line 588, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/worker.py", line 421, in read_udfs
arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0)
File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/worker.py", line 249, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
File "/home/vsts/work/1/s/coffee_segmentation/operations/processes/ca/consolidate_features.py", line 16, in <module>
from coffee_segmentation.utils.utils import load_json, check_for_duplicates, presenceDict
File "/home/vsts/work/1/s/coffee_segmentation/utils/utils.py", line 17, in <module>
'CCH w/ coffee machine': ((f.col('source') == 'customer_md') & (f.col('has_coffee_machine') == 'YES')),
File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/sql/functions.py", line 106, in col
return _invoke_function("col", col)
File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/sql/functions.py", line 57, in _invoke_function
jf = _get_get_jvm_function(name, SparkContext._active_spark_context)
File "/home/vsts/work/1/spark-3.1.2-bin-without-hadoop/python/lib/pyspark.zip/pyspark/sql/functions.py", line 49, in _get_get_jvm_function
return getattr(sc._jvm.functions, name)
AttributeError: 'NoneType' object has no attribute '_jvm'
Searching the error online seems that the vm cant find the spark session.
I tried to define the spark instance also at the start of the script, outside of the class, but still I get the same error.
While debugging I tried to call the spark session inside _MySparkMethod reading a dict, creating a dataframe and printing it and it works just fine.
I'm not able to reproduce the same error in databricks.
I would like to stick to applyInPandas without using legacy apply that will be deprecated in the future if possible. If pandas_udf could solve the issue how could I implement them with a pandas dataframe as input of the udf?