6

I have a list:

dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]

The dataframe I try to add it to is the same length (no issues there).

I tried:

df = df.withColumn("YEARS", dates)
Error: Column needs to be col

I tried also:

df = df.withColumn("YEARS", f.lit(dates))

But that does not work as well.

I saw this question: How to add a constant column in a Spark DataFrame?

But nothing there is useful for this case.

UPDATE: What the expected result is:

df_columns...   | dates_from_list
---------------------------------
original_df_data| 2017
original_df_data| 2018
original_df_data| 2018
original_df_data| 2018
original_df_data| 2019
original_df_data| 2019
original_df_data| 2019
original_df_data| 2020
original_df_data| 2020
original_df_data| 2020
Chiel
  • 1,865
  • 1
  • 11
  • 24
Toby Djelyinski
  • 128
  • 1
  • 11

2 Answers2

11

Your error comes from the fact that you need to pass to withColumn a Column object.

Here are two ways to add your dates as a new column on a Spark DataFrame (join made using order of records in each), depending on the size of your dates data.

1) If you manipulate a small dataset

A concise way to achieve it is to apply a UDF to a monotically increasing id:

from pyspark.sql.functions import udf, monotonically_increasing_id

df = [...]  # 10 records

dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]

df = df.repartition(1).withColumn(
    "YEARS", 
    udf(lambda id: dates[id])(monotonically_increasing_id()))

df.show()

outputs:

+---+-----+
|...|YEARS|
+---+-----+
|...| 2017|
|...| 2018|
|...| 2018|
|...| 2018|
|...| 2019|
|...| 2019|
|...| 2019|
|...| 2020|
|...| 2020|
|...| 2020|
+---+-----+

Note: The .repartition(1) ensures that the generated ids are consecutive. This repartitioning to a single partition can be avoided if you have another way to map each record to a value in dates (like a previously built id column). In this use case, as we expect that the Python list object is quite small, it implies that your DataFrame is also quite small, so this repartitioning is not a big deal.

/!\ Why it will not scale if the dataframe and the python list are too big:

  • repartitionings of the dataframe are needed, leading to shuffles/exchanges that are expensive
  • the .repartition(1) may lead to the generation of a very massive partition that can be very slow to process (because it is huge and because if it does not fit in execution memory it may imply many additional disk I/O to spill RDD blocks to disk), or make the job crash with an OutOfMemoryError.
  • the python list is captured by the udf (by the lambda closure), meaning that it will be broadcasted to each executor of your cluster

2) If you manipulate a dataset with size > millions of rows

Here is another approach that allow to deal way better with millions of rows by manipulate ids and dates columns with pandas and avoid any repartitioning of the Spark DataFrame.

Can be done like this:

import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

# some spark DataFrame of length N
df = [...]  

# generate monotically increasing ids (not consecutive) without repartitioning the Spark DataFrame.
df = df.withColumn("id", monotonically_increasing_id())

# get generated ids (not consecutive) as a mono-column pandas DataFrame
spark_df_ids = df.select("id").toPandas()

# some python list of length N
dates = [2017, 2018, 2018, 2018, 2019, ..., 2019, 2019, 2020, 2020, 2020]

# build pandas DataFrame from dates
dates_pandas_df = pd.DataFrame(dates, columns=["YEARS"])

# append the id column to the dates in pandas
dates_and_ids_pandas_df = dates_pandas_df.join(spark_df_ids)

# convert from pandas DataFrame to spark DataFrame
dates_and_ids_spark_df = spark.createDataFrame(dates_and_ids_pandas_df)

# Perform the final adding of the dates column to the Spark DataFrame with a join in Spark
df.join(dates_and_ids_spark_df, ["id"]).show()

Important: The conversion from and to pandas can be made faster by using Apache Arrow

bonnal-enzo
  • 1,165
  • 9
  • 19
  • I used 10 rows in my example for the sake of example. Does using this UDF cost a lot of efficiency if the dataset is 1-2 million rows ? (I usually code in Pandas, but am translating to PySpark because of the size of the datasets I am manipulating) – Toby Djelyinski Nov 13 '19 at 13:17
  • Even if you avoid the `.repartition(1)` by using another way to map your dataframe records to an element of your python list, there is another potentially huge cost that is clearly not cheap with millions of rows: the python list is capture by the udf (by the lambda closure), meaning that it will be broadcasted. So at this scale it must be preferable to work with pandas directly and then convert your pandas dataframe into a spark one: `spark.createDataFrame(pandas_df)`. Can be made faster with Apache Arrow enabled. You may want to accept this answer if it helps and maybe open another question. – bonnal-enzo Nov 13 '19 at 13:35
  • 1
    Thanks, you can have a look to my edit for a more scalable alternative – bonnal-enzo Nov 13 '19 at 14:53
  • 1
    @EnzoBnl ..I like above approach. Here's a another similar approach but again list elements and dataframe row count has to be same. https://stackoverflow.com/questions/58188495/adding-a-list-element-as-a-column-to-existing-pyspark-dataframe/58225812#58225812 – vikrant rana Nov 14 '19 at 08:01
1

You can try this:

dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]
df = spark.createDataFrame([Row(a=1)])
df = df.withColumn("YEARS",  array( [lit(x) for x in dates]  ))


df.show(truncate=False)
+---+------------------------------------------------------------+
|a  |YEARS                                                       |
+---+------------------------------------------------------------+
|1  |[2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]|
+---+------------------------------------------------------------+

df.select("a", explode("YEARS")).show()
+---+----+
|  a| col|
+---+----+
|  1|2017|
|  1|2018|
|  1|2018|
|  1|2018|
|  1|2019|
|  1|2019|
|  1|2019|
|  1|2020|
|  1|2020|
|  1|2020|
+---+----+
Mustafa
  • 85
  • 4