I am attempting to make a new column from another column in Apache Spark.
The data (heavily abbreviated) looks like
Date Day_of_Week
2018-05-26T00:00:00.000+0000 5
2018-05-05T00:00:00.000+0000 6
and should look like
Date Day_of_Week Weekday
2018-05-26T00:00:00.000+0000 5 Thursday
2018-05-05T00:00:00.000+0000 6 Friday
I have tried advice from the manual https://docs.databricks.com/spark/latest/spark-sql/udf-python.html#register-the-function-as-a-udf & How to pass a constant value to Python UDF? & PySpark add a column to a DataFrame from a TimeStampType column
which resulted in:
def int2day (day_int):
if day_int == 1:
return 'Sunday'
elif day_int == 2:
return 'Monday'
elif day_int == 3:
return 'Tuesday'
elif day_int == 4:
return 'Wednesday'
elif day_int == 5:
return 'Thursday'
elif day_int == 6:
return 'Friday'
elif day_int == 7:
return 'Saturday'
else:
return 'FAIL'
spark.udf.register("day", int2day, IntegerType())
df2 = df.withColumn("Day", day("Day_of_Week"))
and gives a long error
SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 262, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 257, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 325, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/databricks/spark/python/pyspark/serializers.py", line 141, in dump_stream
self._write_with_length(obj, stream)
File "/databricks/spark/python/pyspark/serializers.py", line 151, in _write_with_length
serialized = self.dumps(obj)
File "/databricks/spark/python/pyspark/serializers.py", line 556, in dumps
return pickle.dumps(obj, protocol)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
I don't see how I can apply How to pass a constant value to Python UDF? here, as their example was much much simpler (only true or false)
I've also tried using map functions, as in PySpark add a column to a DataFrame from a TimeStampType column
but
df3 = df2.withColumn("weekday", map(lambda x: int2day, col("Date")))
just says that TypeError: argument 2 to map() must support iteration
but I thought col
does support iteration.
I've read every example online I can find. I don't see how I can apply what other questions have asked to my case.
How can I add another column, using a function of another column?