Your error comes from the fact that you need to pass to withColumn
a Column
object.
Here are two ways to add your dates as a new column on a Spark DataFrame
(join made using order of records in each), depending on the size of your dates data.
1) If you manipulate a small dataset
A concise way to achieve it is to apply a UDF to a monotically increasing id:
from pyspark.sql.functions import udf, monotonically_increasing_id
df = [...] # 10 records
dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]
df = df.repartition(1).withColumn(
"YEARS",
udf(lambda id: dates[id])(monotonically_increasing_id()))
df.show()
outputs:
+---+-----+
|...|YEARS|
+---+-----+
|...| 2017|
|...| 2018|
|...| 2018|
|...| 2018|
|...| 2019|
|...| 2019|
|...| 2019|
|...| 2020|
|...| 2020|
|...| 2020|
+---+-----+
Note: The .repartition(1)
ensures that the generated ids are consecutive. This repartitioning to a single partition can be avoided if you have another way to map each record to a value in dates
(like a previously built id column).
In this use case, as we expect that the Python list object is quite small, it implies that your DataFrame is also quite small, so this repartitioning is not a big deal.
/!\ Why it will not scale if the dataframe and the python list are too big:
- repartitionings of the dataframe are needed, leading to shuffles/exchanges that are expensive
- the
.repartition(1)
may lead to the generation of a very massive partition that can be very slow to process (because it is huge and because if it does not fit in execution memory it may imply many additional disk I/O to spill RDD blocks to disk), or make the job crash with an OutOfMemoryError
.
- the python list is captured by the udf (by the lambda closure), meaning that it will be broadcasted to each executor of your cluster
2) If you manipulate a dataset with size > millions of rows
Here is another approach that allow to deal way better with millions of rows by manipulate ids and dates columns with pandas and avoid any repartitioning of the Spark DataFrame
.
Can be done like this:
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
# some spark DataFrame of length N
df = [...]
# generate monotically increasing ids (not consecutive) without repartitioning the Spark DataFrame.
df = df.withColumn("id", monotonically_increasing_id())
# get generated ids (not consecutive) as a mono-column pandas DataFrame
spark_df_ids = df.select("id").toPandas()
# some python list of length N
dates = [2017, 2018, 2018, 2018, 2019, ..., 2019, 2019, 2020, 2020, 2020]
# build pandas DataFrame from dates
dates_pandas_df = pd.DataFrame(dates, columns=["YEARS"])
# append the id column to the dates in pandas
dates_and_ids_pandas_df = dates_pandas_df.join(spark_df_ids)
# convert from pandas DataFrame to spark DataFrame
dates_and_ids_spark_df = spark.createDataFrame(dates_and_ids_pandas_df)
# Perform the final adding of the dates column to the Spark DataFrame with a join in Spark
df.join(dates_and_ids_spark_df, ["id"]).show()
Important: The conversion from and to pandas can be made faster by using Apache Arrow