0

I have a simple udf written in Python that I altered from a code example in the book Apache Spark in 24 hours. The book uses an older version of Spark and I am running 2.3.3.

I did find this answer but I'm having a hard time figuring out why the book example doesn't work and I'm not really sure if that answer actually does address my question. I'm running this in local mode on Windows 10.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

df = spark.read.csv("full201801.dat",header="true")

columntransform = udf(lambda x: 'Non-Fat Dry Milk' if x == '23040010' else 'foo', StringType())

df.select(df.PRODUCT_NC, columntransform(df.PRODUCT_NC).alias('COMMODITY')).show()


Py4JJavaError: An error occurred while calling o110.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 242, in main
  File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 144, in read_udfs
  File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 120, in read_single_udf
  File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 60, in read_command
  File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 171, in _read_with_length
    return self.loads(obj)
  File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 566, in loads
    return pickle.loads(obj, encoding=encoding)
TypeError: _fill_function() missing 4 required positional arguments: 'defaults', 'dict', 'module', and 'closure_values'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:332)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:286)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1661)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1649)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1648)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1648)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1882)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1820)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 242, in main
  File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 144, in read_udfs
  File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 120, in read_single_udf
  File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 60, in read_command
  File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 171, in _read_with_length
    return self.loads(obj)
  File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 566, in loads
    return pickle.loads(obj, encoding=encoding)
TypeError: _fill_function() missing 4 required positional arguments: 'defaults', 'dict', 'module', and 'closure_values'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:332)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:83)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:66)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:286)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more
Bob Wakefield
  • 3,739
  • 4
  • 20
  • 30

2 Answers2

1

Here the solution to call UDF in pyspark

create a method with functional logic

def getItem(it):
    return 'Non-Fat Dry Milk' if it == '23040010' else 'foo'

register the above method in Pyspark UDF

from pyspark.sql import functions as F


columntransform = F.udf(getItem, StringType())

Call the UDF in Dataframe

    ll = ["23040010", "23040011", "23040012", "23040013", "23040010"]
    n_rdd = sc.parallelize(ll).map(lambda row: Row(row))
    df = sql.createDataFrame(n_rdd, ["nums"])
    df.withColumn("NewItem",columntransform(df["nums"]))

Here the output:

+--------+----------------+
|    nums|         NewItem|
+--------+----------------+
|23040010|Non-Fat Dry Milk|
|23040011|             foo|
|23040012|             foo|
|23040013|             foo|
|23040010|Non-Fat Dry Milk|
+--------+----------------+
Thirupathi Chavati
  • 1,711
  • 12
  • 10
1

I'm not entirely certain what the real issue was, but when I moved all of this to a box running CENTOS 7,everything worked as expected. It's not a problem with the code. It's something with my windows set up.

Bob Wakefield
  • 3,739
  • 4
  • 20
  • 30