4

I am using pandas-on-spark in combination with regex to remove some abbreviations from a column in a dataframe. In pandas this all works fine, but I have the task to migrate this code to a production workload on our spark cluster, and therefore decided to use pandas-on-spark. However, I am running into a weird error. I'm using the following function to clean up the abbreviations (Somewhat simplified here for readability purposes, in reality abbreviations_dict has 61 abbreviations and patterns is a list with three regex patterns).

import pyspark.pandas as pspd

def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
    """
    The job titles contain a lot of abbreviations for common terms.
    We write them out to create a more standardized job title list.

    :param job_list: df.SchoneFunctie during processing steps
    :return: SchoneFunctie where abbreviations are written out in words
    """
    abbreviations_dict = {
        "1e": "eerste",
        "1ste": "eerste",
        "2e": "tweede",
        "2de": "tweede",
        "3e": "derde",
        "3de": "derde",
        "ceo": "chief executive officer",
        "cfo": "chief financial officer",
        "coo": "chief operating officer",
        "cto": "chief technology officer",
        "sr": "senior",
        "tech": "technisch",
        "zw": "zelfstandig werkend"
    }

    #Create a list of abbreviations
    abbreviations_pob = list(abbreviations_dict.keys())

    #For each abbreviation in this list
    for abb in abbreviations_pob:
        # define patterns to look for
        patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
                    fr'{abb}\.']
        # actual recoding of abbreviations to written out form
        value_to_replace = abbreviations_dict[abb]
        for patt in patterns:
            job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)

    return job_list

When I then call the function with a pspd Series, and perform an action so the query plan is executed:

df['SchoneFunctie'] = resolve_abbreviations(df['SchoneFunctie'])
print(df.head(100))

it throws a java.lang.StackOverflowError. The stack trace is too long to paste here, I pasted a subset of it since it is a repeating one.

23/05/05 09:53:14 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4) (PC ID executor driver): java.lang.StackOverflowError
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2408)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)

It goes on like this for quite a while, untill I get:

23/05/03 14:19:11 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
    coro = func()
  File "<input>", line 194, in <module>
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
    pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
    self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
    return self._internal.to_pandas_frame
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
    setattr(self, attr_name, fn(self))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
    pdf = sdf.toPandas()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54483)
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 747, in __init__
    self.handle()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 281, in handle
    poll(accum_updates)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 253, in poll
    if func():
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\serializers.py", line 593, in read_int
    length = stream.read(4)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
----------------------------------------
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
    coro = func()
  File "<input>", line 194, in <module>
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
    pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
    self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
    return self._internal.to_pandas_frame
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
    setattr(self, attr_name, fn(self))
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
    pdf = sdf.toPandas()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
: <exception str() failed>

Some things i've tried / facts I think could be relevant:

  • For now I am trying to run this locally. I am running it locally on a subset of 5000 rows of data, so that shouldn't be the problem. Perhaps increasing some kind of default config could still help.
  • I think this has to do with the lazy evaluation in spark, and the DAG of spark getting too big because of the for-loops in the function. But I have no idea how to solve the problem. As per pyspark-on-pandas best practices documentation I have tried to implement checkpointing, but this is not available for pspd.Series, and converting my Series into a pspd.Dataframe makes the .apply(lambda ...) fail inside the resolve_abbreviations function.

Any help would be greatly appreciated. Perhaps I am better off avoiding the pandas-on-spark API, and transform the code to regular pyspark as the pandas-on-spark API apparently isn't mature enough yet to run pandas scripts "as is"? Or perhaps our code design is flawed by nature and there is another efficient way to achieve similar results?

Psychotechnopath
  • 2,471
  • 5
  • 26
  • 47

1 Answers1

4

Is it possible that your input data is deeply nested? This could contribute to the looping stack calls you can see in there.

The first thing I would try is running with a larger stack size than you're doing now. I'm not sure what OS/java version you're running this on, so can't know what the default stack size is on your machine. Typically, though, it ranges in the order of magnitude of 100KB - 1024KB.

Try running it with a stack size of 4MB. Inside of the JVM, this is done with the Xss parameter. You'll want to do this on the driver, with the spark.driver.extraJavaOptions config parameter. Something like this:

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
     .setMaster("whateverMasterYouHave")
     .setAppName("MyApp")
     .set("spark.driver.extraJavaOptions", "-Xss4M"))
sc = SparkContext.getOrCreate(conf = conf)
Koedlt
  • 4,286
  • 8
  • 15
  • 33
  • Thanks. Do you have a link to additional information regarding this? For example, where did you learn that the Xss parameter controls the stack size? – Psychotechnopath May 04 '23 at 08:02
  • 1
    Apache Spark is written in Scala, which runs in something called the Java Virtual Machine. It's very useful to have some JVM knowledge when you're working with Spark (even if you're writing Python, the underlying basis is the JVM). More information about the `Xss` parameter can be found [here](https://docs.oracle.com/cd/E13150_01/jrockit_jvm/jrockit/jrdocs/refman/optionX.html), but I also suggest reading up on the JVM on websites like [this one](https://www.freecodecamp.org/news/jvm-tutorial-java-virtual-machine-architecture-explained-for-beginners/) or maybe some introductory tutorials. – Koedlt May 04 '23 at 10:02
  • Much appreciated - I already figured that some JVM knowledge would be helpful. – Psychotechnopath May 04 '23 at 11:00
  • Is the StackOverFlow caused by a low stack size, or am I right in my assumption about lazy evaluation, and is this code therefore flawed/an antipattern to begin with? Increasing the stack size solves the problem on my local pc, but I'm still wary to apply this code into production as I don't want to skyrocket our cloud computing costs. I think a code pattern like presented here is common in pandas, and therefore the question could be applicable to a larger audience. Therefore I will award a bounty to an answer that addresses the lazy evaluation part, too. Perhaps you know something about this? – Psychotechnopath May 04 '23 at 12:12
  • 1
    After having a closer look at your stack trace, I realise that the first paragraph in my answer is wrong. I will edit it. Anyways, I don't think this is related to the lazy evaluation model Spark has. Can you edit your post and add a longer version of your stack trace that you already posted? Particularly lines showing `at org.apache.spark....` to see where the error is taking place in Spark itself? Is it possible that your input data is deeply nested? This could contribute to the looping stack calls I'm seeing in there. – Koedlt May 04 '23 at 13:51
  • I've edited my question to include a longer version of the stack trace. There don't seem to be any at org.apache.spark lines in it. AFAIK, my data is not nested, i'm simply cleaning a column with strings, no complex data structures. Please do note however, that the function I'm presenting here is somewhat simplified for readability purposes. In reality, abbreviations dict has 61 abbreviations, and patterns is a list of three regexes. Therefore, the double for-loop in the function runs for 61x3 = 183 iterations in total. – Psychotechnopath May 05 '23 at 08:03
  • Hmmm interesting! The stack trace that you've added looks like it might be the executor stack trace. Do you also have the driver stack trace available? We should be finding some apache spark line somewhere? Maybe it helps to write all of your logs to a text file? Something like `spark-submit &> myTextFile`? And then you can search in there for the `org.apache.spark` lines? – Koedlt May 08 '23 at 10:44
  • 1
    I'm using two ways to run spark code on my local machine. The first is through the python console on PyCharm, using a conda environment with `pip install pyspark` installed. The second way is through a full spark install on WSL linux. This is the only place i can use spark-submit. Running the code on WSL Linux works fine. Perhaps the StackOverFlowError was caused by python shell allocating less resources to spark? At least I'm now quite confident that the code pattern was not the problem - as it runs fine on other installs. That was my major concern - therefore I've accepted your answer. – Psychotechnopath May 09 '23 at 14:51
  • I'm still running into issues with this code - and posted a seperate question on it. Perhaps you can shed your light on it? https://stackoverflow.com/questions/76218874/how-do-i-run-a-function-that-applies-regex-iteratively-in-pandas-on-spark-api – Psychotechnopath May 10 '23 at 13:15