0

I am trying to apply a UDF to a column on my dataframe, and I just can't figure out what I'm doing wrong. I am using Spark 2.4 and Python 3.7.

I distilled my problem down to a minimal example as follows:

I read in a small table with 2 columns, and ~500 rows:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.read.csv('sample.csv', header=True)
df.show()

This yields the following:

+--------------------+--------------+
|       ENCRYPTED_SSN|ACCESSION_DATE|
+--------------------+--------------+
|0029118BA638440EF...|6/26/2006 0:00|
|B01A52270F8BE97E4...|11/5/2003 0:00|
|0FB32636600EDF055...| 6/2/2002 0:00|
|E34E4B5C218A15DEC...|          null|
|6A81A865E2BC7BB47...|          null|
|66BA4C200ABDB9260...|9/25/1976 0:00|
|06F37D4AF59E76532...|          null|
|ECF5B8DAF84B5D675...|          null|
|8556B03FA6441FE90...|7/21/2008 0:00|
|2512544ECD28E40B7...|4/17/2001 0:00|
|EC11EA092F03BF91A...| 3/6/2000 0:00|
|89A057D8F31A02AAD...|9/17/2007 0:00|
|438C432044DF1420A...| 2/2/2001 0:00|
|A3AD0B5D3F227D309...|9/17/2007 0:00|
|937537432FED4252E...|          null|
|16007518571A365C9...|6/26/2006 0:00|
|DBF9233C9E95C3F73...|7/24/2007 0:00|
|4592AD22E1D5A49F7...|8/25/1980 0:00|
|77A003ADFFAD17007...| 1/4/2005 0:00|
|2CE141EA59B95F05A...|7/16/2007 0:00|
+--------------------+--------------+
only showing top 20 rows

Next I try to define a dummy UDF and apply it to the second column:

from pyspark.sql.functions import udf
test_udf = udf(lambda x: 'dummy result')


df = df.withColumn('ACCESSION_DATE', test_udf(df.ACCESSION_DATE))
df.show()

This results in:

py4j.protocol.Py4JJavaError: An error occurred while calling o52.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4) (prevay-w10 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
    at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
    at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
    at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
    at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
    at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
    at java.net.PlainSocketImpl.accept(Unknown Source)
    at java.net.ServerSocket.implAccept(Unknown Source)
    at java.net.ServerSocket.accept(Unknown Source)
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
    ... 24 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
    at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
    at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    ... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
    at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
    at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
    at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
    at java.net.PlainSocketImpl.accept(Unknown Source)
    at java.net.ServerSocket.implAccept(Unknown Source)
    at java.net.ServerSocket.accept(Unknown Source)
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
    ... 24 more

Any help would be appreciated as I am stumped.

Peter
  • 191
  • 2
  • 8
  • Can you look at the very last line of the error message? Does it say `Caused by: java.lang.OutOfMemoryError:`? Then you ran out of memory. – Michael Szczesny Sep 07 '21 at 19:33
  • I cannot see the last line, because Jupyter cuts down the number of lines it shows in error messages, and I am not sure how to change that. However, I doubt that I am running out of memory, as the dataframe I am using to test this is very small. – Peter Sep 07 '21 at 19:58
  • @Peter maybe [this](https://stackoverflow.com/questions/3702675/how-to-catch-and-print-the-full-exception-traceback-without-halting-exiting-the/16946886#16946886) can help in catching the whole error message, so we can understand better what's going on – Ric S Sep 08 '21 at 07:38
  • @RicS Thanks, I tried the suggested solution, but it had no effect. I edited my question to show the entire traceback as it shows up for me, but it is still not displaying all lines. Hopefully that helps at least somewhat. – Peter Sep 08 '21 at 17:15
  • The error `org.apache.spark.SparkException: Python worker failed to connect back` means that something is wrong with the Spark installation, I don't know what exactly. Check [here](https://stackoverflow.com/questions/60916259/sparkexception-python-worker-failed-to-connect-back-when-execute-spark-action) for potential information – Ric S Sep 09 '21 at 07:42

0 Answers0