0

I am attempting to make a new column from another column in Apache Spark.

The data (heavily abbreviated) looks like

Date    Day_of_Week
2018-05-26T00:00:00.000+0000    5
2018-05-05T00:00:00.000+0000    6

and should look like

Date    Day_of_Week    Weekday
2018-05-26T00:00:00.000+0000    5    Thursday
2018-05-05T00:00:00.000+0000    6    Friday

I have tried advice from the manual https://docs.databricks.com/spark/latest/spark-sql/udf-python.html#register-the-function-as-a-udf & How to pass a constant value to Python UDF? & PySpark add a column to a DataFrame from a TimeStampType column

which resulted in:

def int2day (day_int):
  if day_int == 1:
    return 'Sunday'
  elif day_int == 2:
    return 'Monday'
  elif day_int == 3:
    return 'Tuesday'
  elif day_int == 4:
    return 'Wednesday'
  elif day_int == 5:
    return 'Thursday'
  elif day_int == 6:
    return 'Friday'
  elif day_int == 7:
    return 'Saturday'
  else:
    return 'FAIL'

spark.udf.register("day", int2day, IntegerType())
df2 = df.withColumn("Day", day("Day_of_Week"))

and gives a long error

SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 262, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 257, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/databricks/spark/python/pyspark/serializers.py", line 325, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 141, in dump_stream
    self._write_with_length(obj, stream)
  File "/databricks/spark/python/pyspark/serializers.py", line 151, in _write_with_length
    serialized = self.dumps(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 556, in dumps
    return pickle.dumps(obj, protocol)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I don't see how I can apply How to pass a constant value to Python UDF? here, as their example was much much simpler (only true or false)

I've also tried using map functions, as in PySpark add a column to a DataFrame from a TimeStampType column

but

df3 = df2.withColumn("weekday", map(lambda x: int2day, col("Date"))) just says that TypeError: argument 2 to map() must support iteration but I thought col does support iteration.

I've read every example online I can find. I don't see how I can apply what other questions have asked to my case.

How can I add another column, using a function of another column?

con
  • 5,767
  • 8
  • 33
  • 62
  • 1
    You [shouldn't use a `udf` for this](https://stackoverflow.com/questions/38296609/spark-functions-vs-udf-performance/38297050#38297050). See [this post](https://stackoverflow.com/questions/39048229/spark-equivalent-of-if-then-else) on how to do IF-THEN-ELSE logic. – pault Oct 26 '18 at 16:40
  • 1
    If you did want to use `udf` , your syntax is incorrect. The return type should be `StringType()` instead of integer. You can refer to [this post](https://stackoverflow.com/questions/52522057/pyspark-udf-column-on-dataframe) for an example on the correct syntax. – pault Oct 26 '18 at 16:54

1 Answers1

1

You shouldn't need a UDF here at all to accomplish what you're trying to do. You can leverage the built-in pyspark date_format function to extract the name for each day of the week given the date in a column.

import pyspark.sql.functions as func
df = df.withColumn("day_of_week", func.date_format(func.col("Date"), "EEEE"))

The result is a new column added to your dataframe called day_of_week that will display Sunday, Monday, Tuesday etc. based on the value in the Date column.

vielkind
  • 2,840
  • 1
  • 16
  • 16
  • It doesn't look like the `"Date"` column is a `DateType` in this example. It's likely a `StringType` that [first needs to be converted](https://stackoverflow.com/questions/38080748/convert-pyspark-string-to-date-format) before you can use `date_format`. – pault Oct 26 '18 at 16:51