2

Version: DBR 8.4 | Spark 3.1.2

While reading solutions to How to avoid shuffles while joining DataFrames on unique keys?, I've found a few mentions of the need for to create a "custom partitioner", but I can't find any information on that.

I've noticed that in the ~4 hour job I'm currently trying to optimize, most of the time goes to exchanging terabytes of data from a temporary cross-join-and-reduce operation.

Here is a visualization of the current operation:

enter image description here

I'm hoping that if I can set up the cross-join operation with a "custom partitioner" I can force the ~29 billion rows from the cross join operation (which shares the same 2-column primary key with the left joined ~0.6 billion row table) to stay on the workers they were generated on until the whole dataset can be reduced to a mere 1 million rows. i.e. I'm hoping to avoid any shuffles during this time.

The steps in the operation are:

  1. Generate 28 billion rows temporary "TableA" partitioned by 'columnA', keyed by ['columnA', 'columnB']
  2. Left join 1 billion rows "TableB" also partitioned by 'columnA', keyed by ['columnA', 'columnB'] (Kind of a sparse version of temp Table A)
  3. Project a new column (TableC.columnC = TableA.columnC - Coalesce(TableB.columnC, 0) in this specific case)
  4. Project a new row_order() column within each partition e.g. F.row_number().over( Window.partitionBy(['columnA', 'columnB']).orderBy(F.col('columnC').desc())
  5. Take the top N (say 2) - so filter out only the rows with rank (row_number) < 3 (for example), throwing away the other 49,998 rows per-partition.

Since all of these operations are independently performed within each partition ['columnA', 'columnB'] (no interactions between partitions), I was hoping there was some way that I can get through all 5 of those steps without ever reshuffling partitions between workers.


What I've tried:

  • I've tried not specifying any repartitioning instructions at all, this leads to the 3.5 hours time and the DAG below.
  • I've tried explicitly specifying .repartition(9600, 'columnA') on each data source on both sides of the join (excluding the broadcast join case), right before joining. (Note that '9600' is configured as the default number of shuffle partitions to use). This code change resulted in no changes to the query plan - there is still an exchange step happening both before and after the sort-merge-join.
Alain
  • 26,663
  • 20
  • 114
  • 184

0 Answers0