0

There are two dataframes. One - df_table_a is created by reading directly from a Hive table. Another one - df_table_a_slow is created by applying a UDF transformation on top of df_table_a

df_table_a = spark.sql('SELECT INT_COL, DATE_WEEK FROM table_a')

shift_date_udf = F.udf(lambda day: day - pd.offsets.Week(3, weekday=0), DateType())
df_table_a_slow = df_fast.withColumn('DATE_WEEK', shift_date_udf('DATE_WEEK'))

Then there is df_table_b dataframe, which is created by reading directly from Hive table as well.

df_table_b = spark.sql('SELECT INT_COL, DATE_WEEK, OTHER_COLUMN FROM table_b')

Now we join df_table_b to both dataframes defined above.

df_fast_join = df_table_a.join(df, on=['INT_COL', 'DATE_WEEK'], how='left')
df_slow_join = df_table_a_slow.join(df, on=['INT_COL', 'DATE_WEEK'], how='left')

I wanted to time the execution time of both joins, so here is a function to approximate transformation times:

def time_transformation(df, subject):
    start = time.time()
    cnt = df.count()
    end = time.time()
    print(f'>>> {subject} - {cnt}', end - start)

Results:

time_transformation(df_fast_join, 'fast join')
 >> fast join - 75739267 37.43
time_transformation(df_slow_join, 'slow join')
 >>  slow join - 75739267 553.32

The UDF transformation itself does not seem to take much time:

time_transformation(df_slow, 'df_slow')
  >> df_slow - 75739267 0.25

The execution plans for both joins differ by a single line:

+- BatchEvalPython [<lambda>(DATE_WEEK#1)], [INT_COL#0, DATE_WEEK#1, pythonUDF0#843]

Question: Why does applying a UDF to one of the dataframes slows down the join by more than 10 times? How can it be fixed?

Denys
  • 4,287
  • 8
  • 50
  • 80
  • udf is not the natural functions in the spark and so it is always slower than the usual spark process. Even you think that is a very easy and tiny job, but it is not actually in the point of spark. – Lamanus Aug 02 '19 at 10:41
  • I suspect `df_slow` of not doing actually any calcul, which is why it looks fast, but try to do a collect, this will be a lot slower – BlueSheepToken Aug 02 '19 at 15:04

2 Answers2

1

UDF functions are slow. Especially if you are using Python: dataframes are essentially JVM objects and python UDFs are applied one row at time. Every time they are applied the code is being serialized. Have a look here. From the link you can read:

These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.

LizardKing
  • 601
  • 6
  • 13
1

How Python UDF works?

When a row is processed inside an executor, it is serialized and then sent to Python interpreter. Then the row is deserialized in Python interpreter and UDF is applied on it. This UDF will do changes to row. Now Row will be again serialized and sent back to executor JVM which deserializes it and do further work. So this extra serialization-deserialization-serialization-deserialization slows down processing of each row significantly and so python UDFs are very slow.

How to speed this up?

If you can achieve the same using some SQL operator or Scala UDF, this will run much faster.

moriarty007
  • 2,054
  • 16
  • 20