There are two dataframes. One - df_table_a
is created by reading directly from a Hive table.
Another one - df_table_a_slow
is created by applying a UDF transformation on top of df_table_a
df_table_a = spark.sql('SELECT INT_COL, DATE_WEEK FROM table_a')
shift_date_udf = F.udf(lambda day: day - pd.offsets.Week(3, weekday=0), DateType())
df_table_a_slow = df_fast.withColumn('DATE_WEEK', shift_date_udf('DATE_WEEK'))
Then there is df_table_b
dataframe, which is created by reading directly from Hive table as well.
df_table_b = spark.sql('SELECT INT_COL, DATE_WEEK, OTHER_COLUMN FROM table_b')
Now we join df_table_b
to both dataframes defined above.
df_fast_join = df_table_a.join(df, on=['INT_COL', 'DATE_WEEK'], how='left')
df_slow_join = df_table_a_slow.join(df, on=['INT_COL', 'DATE_WEEK'], how='left')
I wanted to time the execution time of both joins, so here is a function to approximate transformation times:
def time_transformation(df, subject):
start = time.time()
cnt = df.count()
end = time.time()
print(f'>>> {subject} - {cnt}', end - start)
Results:
time_transformation(df_fast_join, 'fast join')
>> fast join - 75739267 37.43
time_transformation(df_slow_join, 'slow join')
>> slow join - 75739267 553.32
The UDF transformation itself does not seem to take much time:
time_transformation(df_slow, 'df_slow')
>> df_slow - 75739267 0.25
The execution plans for both joins differ by a single line:
+- BatchEvalPython [<lambda>(DATE_WEEK#1)], [INT_COL#0, DATE_WEEK#1, pythonUDF0#843]
Question: Why does applying a UDF to one of the dataframes slows down the join by more than 10 times? How can it be fixed?