I am using pandas-on-spark in combination with regex to remove some abbreviations from a column in a dataframe. In pandas this all works fine, but I have the task to migrate this code to a production workload on our spark cluster, and therefore decided to use pandas-on-spark. However, I am running into a weird error. I'm using the following function to clean up the abbreviations (Somewhat simplified here for readability purposes, in reality abbreviations_dict has 61 abbreviations and patterns is a list with three regex patterns).
import pyspark.pandas as pspd
def resolve_abbreviations(job_list: pspd.Series) -> pspd.Series:
"""
The job titles contain a lot of abbreviations for common terms.
We write them out to create a more standardized job title list.
:param job_list: df.SchoneFunctie during processing steps
:return: SchoneFunctie where abbreviations are written out in words
"""
abbreviations_dict = {
"1e": "eerste",
"1ste": "eerste",
"2e": "tweede",
"2de": "tweede",
"3e": "derde",
"3de": "derde",
"ceo": "chief executive officer",
"cfo": "chief financial officer",
"coo": "chief operating officer",
"cto": "chief technology officer",
"sr": "senior",
"tech": "technisch",
"zw": "zelfstandig werkend"
}
#Create a list of abbreviations
abbreviations_pob = list(abbreviations_dict.keys())
#For each abbreviation in this list
for abb in abbreviations_pob:
# define patterns to look for
patterns = [fr'((?<=( ))|(?<=(^))|(?<=(\\))|(?<=(\())){abb}((?=( ))|(?=(\\))|(?=($))|(?=(\))))',
fr'{abb}\.']
# actual recoding of abbreviations to written out form
value_to_replace = abbreviations_dict[abb]
for patt in patterns:
job_list = job_list.str.replace(pat=fr'{patt}', repl=f'{value_to_replace} ', regex=True)
return job_list
When I then call the function with a pspd Series, and perform an action so the query plan is executed:
df['SchoneFunctie'] = resolve_abbreviations(df['SchoneFunctie'])
print(df.head(100))
it throws a java.lang.StackOverflowError. The stack trace is too long to paste here, I pasted a subset of it since it is a repeating one.
23/05/05 09:53:14 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 4) (PC ID executor driver): java.lang.StackOverflowError
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2408)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1185)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2319)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2352)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1690)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2428)
It goes on like this for quite a while, untill I get:
23/05/03 14:19:11 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
coro = func()
File "<input>", line 194, in <module>
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
return self._internal.to_pandas_frame
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
setattr(self, attr_name, fn(self))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
pdf = sdf.toPandas()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
sock_info = self._jdf.collectToPython()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
return f(*a, **kw)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54483)
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 316, in _handle_request_noblock
self.process_request(request, client_address)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 347, in process_request
self.finish_request(request, client_address)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 360, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socketserver.py", line 747, in __init__
self.handle()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 281, in handle
poll(accum_updates)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 253, in poll
if func():
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\accumulators.py", line 257, in accum_updates
num_updates = read_int(self.rfile)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\serializers.py", line 593, in read_int
length = stream.read(4)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
----------------------------------------
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "C:\Program Files\JetBrains\PyCharm 2021.3\plugins\python\helpers\pydev\pydevconsole.py", line 364, in runcode
coro = func()
File "<input>", line 194, in <module>
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12255, in __repr__
pdf = cast("DataFrame", self._get_or_create_repr_pandas_cache(max_display_count))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12246, in _get_or_create_repr_pandas_cache
self, "_repr_pandas_cache", {n: self.head(n + 1)._to_internal_pandas()}
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\frame.py", line 12241, in _to_internal_pandas
return self._internal.to_pandas_frame
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\utils.py", line 588, in wrapped_lazy_property
setattr(self, attr_name, fn(self))
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\pandas\internal.py", line 1056, in to_pandas_frame
pdf = sdf.toPandas()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\pandas\conversion.py", line 205, in toPandas
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\dataframe.py", line 817, in collect
sock_info = self._jdf.collectToPython()
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\pyspark\sql\utils.py", line 190, in deco
return f(*a, **kw)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 511, in send_command
answer = smart_decode(self.stream.readline()[:-1])
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\socket.py", line 704, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
File "C:\Users\MyUser\.conda\envs\Anaconda3.9\lib\site-packages\py4j\clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
: <exception str() failed>
Some things i've tried / facts I think could be relevant:
- For now I am trying to run this locally. I am running it locally on a subset of 5000 rows of data, so that shouldn't be the problem. Perhaps increasing some kind of default config could still help.
- I think this has to do with the lazy evaluation in spark, and the DAG of spark getting too big because of the for-loops in the function.
But I have no idea how to solve the problem. As per
pyspark-on-pandas best practices documentation I have tried to
implement checkpointing, but this is not available for pspd.Series,
and converting my Series into a
pspd.Dataframe
makes the.apply(lambda ...)
fail inside the resolve_abbreviations function.
Any help would be greatly appreciated. Perhaps I am better off avoiding the pandas-on-spark API, and transform the code to regular pyspark as the pandas-on-spark API apparently isn't mature enough yet to run pandas scripts "as is"? Or perhaps our code design is flawed by nature and there is another efficient way to achieve similar results?