2

I am trying to understand local shuffle reader (custom shuffle reader that reads shuffle files locally) used by shuffle manager after the conversion of SortMergeJoin into BroadcastHashJoin. Let me first list my understanding

Let's consider I am reading two files into Dataframes , applied some filter and performing a join with AQE enabled. And based on auto broadcast threshold and size of the written shuffle files on ShuffleMapStage, it converts the SMJ into BHJ . The process is as follows

  • AQE submits initial plan --> 2 Shuffle map stages are submitted by DAGScheduler --> shuffleTasks for both stages are executed on two executors and shuffle files are written on respective executor disks --> status send to mapOutputTracker

  • AQE replans the physical plan with BHJ --> resubmit the plan --> Missing stages are submitted by DAGScheduler

  • Since shuffleMapStages are already done and written files are available , those stages are skipped --> shuffle files from shuffle map stage are read for join relation to be broadcasted , build an rdd from it and collected to driver and broadcasted with executor which holds the other dataset partition

  • Build a hash map on broadcasted data and perform classic hash join

I have added the graph for executed plan from the SQL tab of spark UI below : (broken into two pics since it was lengthy)

enter image description here enter image description here

Now my questions are :

  1. Does my understanding is correct ?

  2. I can see CustomShuffleReader (local) used on both left and right relations . Why is that ? Does CustomShuffleReader (local) means it will read from respective local storages of executors where the shuffleMapstage has executed ?

  3. Is it like , If I disable enter image description here , then the shuffle files for data to be broadcasted will be read from reducer side executor through usual shuffleClient of respective sortShuffleManager and if I enable that , it will use a CustomShuffleReader which will read the dataset to be broadcasted from respective node where it has written the shuffle files instead of reading from reducer side using reducer ids ? enter image description here

I have went through all the below discussion threads/blogs and still the doubt exists:

akhil pathirippilly
  • 920
  • 1
  • 7
  • 25

0 Answers0