2

I am using pandas-on-spark in combination with regex to remove some abbreviations from a column in a dataframe. In pandas this all works fine, but I have the task to migrate this code to a production workload on our spark cluster, and therefore decided to use pandas-on-spark. I'm experiencing issues with the function below. I'm using it to clean up a number of abbreviations (Somewhat simplified here for readability purposes, in reality abbreviations_dict has 61 abbreviations and patterns is a list with three regex patterns - so for loop iterates 61x3 = 183 iterations). df["SchoneFunctie"] is a pyspark.pandas.Series of approx 420k rows. I'm running this code on an Apache spark pool in Azure Synapse, with Spark version = 3.3. (To be a bit more specific: 3.3.1.5.2-90111858)

import pyspark.pandas as pspd

def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
    """
    The job titles contain a lot of abbreviations for common terms.
    We write them out to create a more standardized job title list.

    :param job_list: df.SchoneFunctie during processing steps
    :return: SchoneFunctie where abbreviations are written out in words
    """
    abbreviations_dict = {
        "1e": "eerste",
        "1ste": "eerste",
        "2e": "tweede",
        "2de": "tweede",
        "3e": "derde",
        "3de": "derde",
        "ceo": "chief executive officer",
        "cfo": "chief financial officer",
        "coo": "chief operating officer",
        "cto": "chief technology officer",
        "sr": "senior",
        "tech": "technisch",
        "zw": "zelfstandig werkend"
    }

    #Create a list of abbreviations
    abbreviations_pob = list(abbreviations_dict.keys())

    #For each abbreviation in this list
    for abb in abbreviations_pob:
        # define patterns to look for
        patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
                    fr'{abb}\.']
        # actual recoding of abbreviations to written out form
        value_to_replace = abbreviations_dict[abb]
        for patt in patterns:
            job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)

    return job_list

The problem & things I've tried:

As per pandas-on-spark best practices docs, I'm trying to checkpoint my dataframe after this function, as it's a function with a bunch of iterations so the lineage can get big quite fast. df.spark.explain() gives a query plan of 373 lines. Please find a snippet of it below:

*(186) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6074 AS SchoneFunctie#5881]
+- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6073)#5878], [pythonUDF0#6074], 200
   +- *(185) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6073]
      +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6072)#5873], [pythonUDF0#6073], 200
         +- *(184) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6072]
            +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6071)#5868], [pythonUDF0#6072], 200
               +- *(183) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6071]
                  +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6070)#5863], [pythonUDF0#6071], 200
                     +- *(182) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6070]
                        +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6069)#5858], [pythonUDF0#6070], 200
                           +- *(181) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6069]
                              +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6068)#5853], [pythonUDF0#6069], 200
                                 +- *(180) Project [__index_level_0__#3945L, AfwijkendeFunctie#4774, WerkgeverFunctie#4776, CAOFunctie#4778, StandaardFunctieID#4780, FunctieID#4782, LoketFunctieNaam#4817, pythonUDF0#6068]
                                    +- ArrowEvalPython [pudf(__index_level_0__#3945L, pythonUDF0#6067)#5848], [pythonUDF0#6068], 200

However, whatever I'm trying, I can't succesfully run this function without running into errors.

Simply calling resolve_abbreviations and trying to checkpoint after

df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
df = df.spark.checkpoint()

Results in the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [17], line 2
      1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 df = df.spark.checkpoint()

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/spark/accessors.py:1073, in SparkFrameMethods.checkpoint(self, eager)
   1070 from pyspark.pandas.frame import DataFrame
   1072 internal = self._psdf._internal.resolved_copy
-> 1073 checkpointed_sdf = internal.spark_frame.checkpoint(eager)
   1074 return DataFrame(internal.with_new_sdf(checkpointed_sdf))

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:682, in DataFrame.checkpoint(self, eager)
    665 def checkpoint(self, eager: bool = True) -> "DataFrame":
    666     """Returns a checkpointed version of this :class:`DataFrame`. Checkpointing can be used to
    667     truncate the logical plan of this :class:`DataFrame`, which is especially useful in
    668     iterative algorithms where the plan may grow exponentially. It will be saved to files
   (...)
    680     This API is experimental.
    681     """
--> 682     jdf = self._jdf.checkpoint(eager)
    683     return DataFrame(jdf, self.sparkSession)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o8801.checkpoint.
: org.apache.spark.SparkException: Job 32 cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1196)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1194)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1194)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2897)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2794)
    at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2217)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1484)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:2217)
    at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2154)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:958)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2371)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2403)
    at org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:166)
    at org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:60)
    at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
    at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1913)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1903)
    at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:700)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
    at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:691)
    at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:654)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)

Trying to use local_checkpoint() instead of checkpoint

df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
df = df.spark.local_checkpoint()

Results in a similar error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [21], line 2
      1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 df = df.spark.local_checkpoint()

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/spark/accessors.py:1111, in SparkFrameMethods.local_checkpoint(self, eager)
   1108 from pyspark.pandas.frame import DataFrame
   1110 internal = self._psdf._internal.resolved_copy
-> 1111 checkpointed_sdf = internal.spark_frame.localCheckpoint(eager)
   1112 return DataFrame(internal.with_new_sdf(checkpointed_sdf))

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:702, in DataFrame.localCheckpoint(self, eager)
    685 def localCheckpoint(self, eager: bool = True) -> "DataFrame":
    686     """Returns a locally checkpointed version of this :class:`DataFrame`. Checkpointing can be
    687     used to truncate the logical plan of this :class:`DataFrame`, which is especially useful in
    688     iterative algorithms where the plan may grow exponentially. Local checkpoints are
   (...)
    700     This API is experimental.
    701     """
--> 702     jdf = self._jdf.localCheckpoint(eager)
    703     return DataFrame(jdf, self.sparkSession)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o12529.localCheckpoint.
: org.apache.spark.SparkException: Job 32 cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1196)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1194)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1194)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2897)
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2794)
    at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2217)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1484)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:2217)
    at org.apache.spark.SparkContext$$anon$3.run(SparkContext.scala:2154)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:958)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2371)
    at org.apache.spark.rdd.LocalRDDCheckpointData.doCheckpoint(LocalRDDCheckpointData.scala:54)
    at org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:75)
    at org.apache.spark.rdd.RDD.$anonfun$doCheckpoint$1(RDD.scala:1913)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1903)
    at org.apache.spark.sql.Dataset.$anonfun$checkpoint$1(Dataset.scala:700)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
    at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:691)
    at org.apache.spark.sql.Dataset.localCheckpoint(Dataset.scala:678)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)

Even when I try to break up the lineage by calling an action

df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
print(df.head(10))

I get an error:

/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:201: UserWarning: toPandas attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true, but has reached the error below and can not continue. Note that 'spark.sql.execution.arrow.pyspark.fallback.enabled' does not have an effect on failures in the middle of computation.
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [23], line 2
      1 df['SchoneFunctie'] = resolve_abbreviations(df["SchoneFunctie"])
----> 2 print(df.head(10))

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12255, in DataFrame.__repr__(self)
  12252 if max_display_count is None:
  12253     return self._to_internal_pandas().to_string()
> 12255 pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
  12256 pdf_length = len(pdf)
  12257 pdf = cast("DataFrame", pdf.iloc[:max_display_count])

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12246, in DataFrame._get_or_create_repr_pandas_cache(self, n)
  12243 def _get_or_create_repr_pandas_cache(self, n: int) -> Union[pd.DataFrame, pd.Series]:
  12244     if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
  12245         object.__setattr__(
> 12246             self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  12247         )
  12248     return self._repr_pandas_cache[n]

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/frame.py:12241, in DataFrame._to_internal_pandas(self)
  12235 def _to_internal_pandas(self) -> pd.DataFrame:
  12236     """
  12237     Return a pandas DataFrame directly from _internal to avoid overhead of copy.
  12238 
  12239     This method is for internal use only.
  12240     """
> 12241     return self._internal.to_pandas_frame

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/utils.py:588, in lazy_property.<locals>.wrapped_lazy_property(self)
    584 @property
    585 @functools.wraps(fn)
    586 def wrapped_lazy_property(self):
    587     if not hasattr(self, attr_name):
--> 588         setattr(self, attr_name, fn(self))
    589     return getattr(self, attr_name)

File /opt/spark/python/lib/pyspark.zip/pyspark/pandas/internal.py:1056, in InternalFrame.to_pandas_frame(self)
   1054 """Return as pandas DataFrame."""
   1055 sdf = self.to_internal_spark_frame
-> 1056 pdf = sdf.toPandas()
   1057 if len(pdf) == 0 and len(sdf.schema) > 0:
   1058     pdf = pdf.astype(
   1059         {field.name: spark_type_to_pandas_dtype(field.dataType) for field in sdf.schema}
   1060     )

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:140, in PandasConversionMixin.toPandas(self)
    138 tmp_column_names = ["col_{}".format(i) for i in range(len(self.columns))]
    139 self_destruct = jconf.arrowPySparkSelfDestructEnabled()
--> 140 batches = self.toDF(*tmp_column_names)._collect_as_arrow(
    141     split_batches=self_destruct
    142 )
    143 if len(batches) > 0:
    144     table = pyarrow.Table.from_batches(batches)

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:358, in PandasConversionMixin._collect_as_arrow(self, split_batches)
    355         results = list(batch_stream)
    356 finally:
    357     # Join serving thread and raise any exceptions from collectAsArrowToPython
--> 358     jsocket_auth_server.getResult()
    360 # Separate RecordBatches from batch order indices in results
    361 batches = results[:-1]

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File ~/cluster-env/clonedenv/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o16336.getResult.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:97)
    at org.apache.spark.security.SocketAuthServer.getResult(SocketAuthServer.scala:93)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: The "collectAsArrowToPython" action failed. Please, fill a bug report in, and provide the full stack trace.
    at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:552)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:564)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3869)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1(Dataset.scala:3792)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$1$adapted(Dataset.scala:3791)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$2(SocketAuthServer.scala:139)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1(SocketAuthServer.scala:141)
    at org.apache.spark.security.SocketAuthServer$.$anonfun$serveToStream$1$adapted(SocketAuthServer.scala:136)
    at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:113)
    at org.apache.spark.security.SocketFuncServer.handleConnection(SocketAuthServer.scala:107)
    at org.apache.spark.security.SocketAuthServer$$anon$1.$anonfun$run$4(SocketAuthServer.scala:68)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:68)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:222)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:384)
    at org.apache.spark.sql.execution.CollectLimitExec.doExecute(limit.scala:70)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:230)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:268)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:265)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:226)
    at org.apache.spark.sql.Dataset.toArrowBatchRdd(Dataset.scala:3923)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$5(Dataset.scala:3810)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2(Dataset.scala:3815)
    at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToPython$2$adapted(Dataset.scala:3792)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3871)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562)
    ... 19 more

My Question

What is going wrong here? I understand that my lineage gets large because of the nested for-loop. It seems that any action I perfrom crashes my application but I don't know how to avoid it. It could also be that this is a pandas-on-spark issue, and that I would be better off using regular pyspark for this. At this point I am quite stuck, so any advice relating to solving this issue would be greatly appreciated.

Psychotechnopath
  • 2,471
  • 5
  • 26
  • 47
  • Have you verified that your query plan indeed gets very large by checking the output of `df.explain()`? What happens if you don't checkpoint? – Koedlt May 11 '23 at 04:57
  • Also, are you sure that you're using Apache Spark v3.3.0? Because if I take one of the lines in your stack trace to follow the error, for example `at org.apache.spark.SparkContext.runJob(SparkContext.scala:2350)` and try to find it in the source code at [line 2350](https://github.com/apache/spark/blob/f74867bddfbcdd4d08076db36851e88b15e66556/core/src/main/scala/org/apache/spark/SparkContext.scala#L2350) I see that this doesn't match up with your stack trace? – Koedlt May 11 '23 at 06:09
  • Yes, I have checked it. The query plan is a nested plan of 373 lines deep. I've edited a snippet of it into the question. I am sure I am running Spark version 3.3 - I've verified it. The exact build I'm on is 3.3.1.5.2-90111858. – Psychotechnopath May 11 '23 at 07:04
  • Hmm I'm trying to go through the source code but it's really hard if I don't know on which exact commit you are. Can you please tell me what the output is of [`sc.version`](https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/python/pyspark/context.py#L505) or `sparkContext.version`? I'm also not familiar with the syntax of your version number (3.3.1.5.2-90111858), could you tell me where you got it from? – Koedlt May 12 '23 at 20:34
  • It's what I see in the Spark UI - right next to the Apache spark logo in the top left corner. I assumed this was the spark version (Since I am on spark 3.3.1). Output of `print(sc.version)` is also 3.3.1.5.2-90111858. I am using Spark on Azure Synapse in the spark pool, zo perhaps it's an azure specific build? I am sure that general version is 3.3, as is seen here: https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-33-runtime – Psychotechnopath May 16 '23 at 11:29

1 Answers1

1

It seems like you might be hitting against a situation where it's not reasonable anymore to use the Pandas API on Spark.

The fact that your query plan is so gigantic for something that should be a single Project stage in your query plan is probably problematic.

You can go 2 ways from here:

  • Chop up your function into pieces, and checkpoint your dataframe in between function calls. This will make your query even less performant, but it will chop up your lineage in between each checkpoint operation.
  • Just use plain Pyspark for this. Below you can find an example:
from pyspark.sql.types import StringType
import pyspark.sql.functions as F

def resolve_abbreviations(df, colName):
    """
    The job titles contain a lot of abbreviations for common terms.
    We write them out to create a more standardized job title list.

    :param job_list: df.SchoneFunctie during processing steps
    :return: SchoneFunctie where abbreviations are written out in words
    """
    abbreviations_dict = {
        "1e": "eerste",
        "1ste": "eerste",
        "2e": "tweede",
        "2de": "tweede",
        "3e": "derde",
        "3de": "derde",
        "ceo": "chief executive officer",
        "cfo": "chief financial officer",
        "coo": "chief operating officer",
        "cto": "chief technology officer",
        "sr": "senior",
        "tech": "technisch",
        "zw": "zelfstandig werkend"
    }

    #Create a list of abbreviations
    abbreviations_pob = list(abbreviations_dict.keys())

    #For each abbreviation in this list
    for abb in abbreviations_pob:
        # define patterns to look for
        patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
                    fr'{abb}\.']
        # actual recoding of abbreviations to written out form
        value_to_replace = abbreviations_dict[abb]
        for patt in patterns:
            df = df.withColumn(colName, F.regexp_replace(colName, patt, value_to_replace))

    return df


df = spark.createDataFrame(
    [
        "Ik ben de 2e",
        "Jij bent de 1e geworden",
        "Ik wil de 3de zijn, maar ik ben de ceo",
        "Jij bent tech gezien de sr",
        "Later wil ik zw zijn"
    ],
    StringType()
)

There are but small differences between your function and mine:

  • The function takes in 2 parameters, a dataframe and a String (representing a column)
  • The line where we're actually doing the regex replacement: df.withColumn(colName, F.regexp_replace(colName, patt, value_to_replace)) uses Pyspark native functions (withColumn, regex_replace) so Spark can realize not all of these different operations should become a step in the query plan.

Now, if I call this function, I get the nicely translated text:

>>> resolve_abbreviations(df, "value").show(truncate=False)
+------------------------------------------------------------+
|value                                                       |
+------------------------------------------------------------+
|Ik ben de tweede                                            |
|Jij bent de eerste geworden                                 |
|Ik wil de derde zijn, maar ik ben de chief executive officer|
|Jij bent technisch gezien de senior                         |
|Later wil ik zelfstandig werkend zijn                       |
+------------------------------------------------------------+

And if you look at the query plan of this operation:

>>> resolve_abbreviations(df, "value").explain()
== Physical Plan ==
*(1) Project [regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(reg
exp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(value#0, ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))1e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), eerste, 1), 1e\., eerste, 1), ((?<=( ))|(?<=(^))|(?<=
(\\))|(?<=(\()))1ste((?=( ))|(?=(\\))|(?=($))|(?=(\)))), eerste, 1), 1ste\., eerste, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))2e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), tweede, 1), 2e\., tweede, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))2de((?=( ))|(?=(\\))|(?=($))|(?=(\))
)), tweede, 1), 2de\., tweede, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))3e((?=( ))|(?=(\\))|(?=($))|(?=(\)))), derde, 1), 3e\., derde, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))3de((?=( ))|(?=(\\))|(?=($))|(?=(\)))), derde, 1), 3de\., derde, 1), ((?<=( ))|(?<=(^))|(?<=
(\\))|(?<=(\()))ceo((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief executive officer, 1), ceo\., chief executive officer, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))cfo((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief financial officer, 1), cfo\., chief financial officer, 1), ((?<=( 
))|(?<=(^))|(?<=(\\))|(?<=(\()))coo((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief operating officer, 1), coo\., chief operating officer, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))cto((?=( ))|(?=(\\))|(?=($))|(?=(\)))), chief technology officer, 1), cto\., chief technology of
ficer, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))sr((?=( ))|(?=(\\))|(?=($))|(?=(\)))), senior, 1), sr\., senior, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\()))tech((?=( ))|(?=(\\))|(?=($))|(?=(\)))), technisch, 1), tech\., technisch, 1), ((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\
()))zw((?=( ))|(?=(\\))|(?=($))|(?=(\)))), zelfstandig werkend, 1), zw\., zelfstandig werkend, 1) AS value#275]
+- *(1) Scan ExistingRDD[value#0]

You see that even though you're doing many regex replacements, they are all part of a single Project step in your logical plan. This will make your operation much more performant as well.

Koedlt
  • 4,286
  • 8
  • 15
  • 33
  • Thanks for the answer! Can you elaborate or link a resource on what is a `Project` step? I thought spark uses jobs and stages, is a project equivalent to one of those? – Psychotechnopath May 22 '23 at 06:19
  • 1
    For sure! [This SO answer](https://stackoverflow.com/a/47527617/15405732) explains it really well. To directly answer your question about jobs and stages, the `Project` step (or more correctly, node) is a part of your query plan (which is a tree). After having created your query plan, Spark will divide up the work in jobs/stages/tasks. So think of it as being part of one of the logical steps Spark has to undertake (so `Project` is not equivalent to your jobs/stages, but you can have a `Project` happen during a specific stage). – Koedlt May 22 '23 at 06:25
  • 1
    [This](https://www.youtube.com/watch?v=UZt_tqx4sII) Youtube video also gives a good introduction in how to read Spark query plans. – Koedlt May 22 '23 at 06:29
  • Regarding your answer, you say 'so Spark can realize not all of these different operations should become a step in the query plan.'. Why doesn't Spark 'realize' this too when I'm using their pandas-on-spark API? Isn't that the whole point of making a specific pandas-on-spark API? Or am I still missing something here. – Psychotechnopath May 24 '23 at 08:50
  • 1
    I'm not sure, because I'm not familiar with the source code of pandas-on-spark. But what I would guess is that at the moment, each pandas-on-spark "transformation" also triggers a ArrowEvalPython "action", which makes the [catalyst optimizer](https://www.databricks.com/glossary/catalyst-optimizer) unable to group all the operations in 1 single operation. – Koedlt May 25 '23 at 15:32
  • 1
    Also, I guess the goal of pandas-on-spark is to onboard more users onto Spark by releasing an API that many people are already used to. That does not necessarily mean that this API will be as performant as writing the actual Spark code yourself. But take this with a grain of salt, this is opinionated and I don't have that much knowledge on pandas-on-spark itself! – Koedlt May 25 '23 at 15:34