0

I have a script snippet that I am running in different cluster setups on pyspark 2.4

v1 = spark.read.parquet(os.path.join(v1_prefix, 'df1.parquet'))
v2 = spark.read.parquet(os.path.join(v2_prefix, 'df2.parquet'))

out = v1.join(v2, [v1.Id == v2.Id, v1.Year == v2.Year, v1.Month == v2.Month])

for x in v1.columns:
    tmp = out.select(v1[x].alias(x + '_old'), v2[x].alias(x + '_new')).filter('{}_old != {}_new'.format(x,x ))
    if tmp.count() > 0:
        tmp.show()

Both of these are dataframes with 200+ columns and 1.5 million records, so out dataframe has 400+ columns that are compared to each other to determine whether there are differences.

  • single node cluster takes 4-8 minutes
  • 2 node cluster takes ~ 50 minutes

I assume that in 2-node cluster data is partitioned over different executors and being shuffled, which slows down the performance.

How to improve out dataframe so it will be evenly distributed and it will be running at least with the same performance as ran on single node using spark 2.4?

Leonid
  • 1
  • 1

1 Answers1

0

A broadcast join should help.

import pyspark.sql.functions as F
out = v1.join(F.broadcast(v2), [v1.Id == v2.Id, v1.Year == v2.Year, v1.Month == v2.Month])

A DataFrame with only 1.5 million rows should be small enough to be broadcasted. The smaller DataFrame should be broadcasted (seems like both are roughly equal sized in your example.

Try refactoring out.select(v1[x].alias(x + '_old'), v2[x].alias(x + '_new')) with the design pattern outlined here and here. You generally don't want to loop over all columns in a DataFrame.

Welcome to the PySpark world ;)

Powers
  • 18,150
  • 10
  • 103
  • 108
  • Thank you! I will try a broadcast join, but in my understanding it is not that small. Are there other options, like reducing or repartitioning dataframe by key, so each worker should have partitions of v1 and v2 dataframe on its side, which I assume has to increase performance? – Leonid Jul 30 '20 at 12:12
  • @Leonid - yep, there are tactics to optimize shuffle joins, but they're complex. Best to stick with broadcast joins if your DataFrames are small enough to be broadcasted. Report back with the runtime of the broadcast approach - I am curious! – Powers Jul 30 '20 at 12:18
  • 1
    I wouldn't say that broadcast join gave any lift in performance in this case. But caching out dataframe worked perfectly. Also, using spark 3.0 and AQE enabled gave a little lift (3.5 minutes vs 4-5 minutes w/o AQE). – Leonid Jul 30 '20 at 18:20