I am trying to understand local shuffle reader (custom shuffle reader that reads shuffle files locally) used by shuffle manager after the conversion of SortMergeJoin into BroadcastHashJoin. Let me first list my understanding
Let's consider I am reading two files into Dataframes , applied some filter and performing a join with AQE enabled. And based on auto broadcast threshold and size of the written shuffle files on ShuffleMapStage, it converts the SMJ into BHJ . The process is as follows
AQE submits initial plan --> 2 Shuffle map stages are submitted by DAGScheduler --> shuffleTasks for both stages are executed on two executors and shuffle files are written on respective executor disks --> status send to mapOutputTracker
AQE replans the physical plan with BHJ --> resubmit the plan --> Missing stages are submitted by DAGScheduler
Since shuffleMapStages are already done and written files are available , those stages are skipped --> shuffle files from shuffle map stage are read for join relation to be broadcasted , build an rdd from it and collected to driver and broadcasted with executor which holds the other dataset partition
Build a hash map on broadcasted data and perform classic hash join
I have added the graph for executed plan from the SQL tab of spark UI below : (broken into two pics since it was lengthy)
Now my questions are :
Does my understanding is correct ?
I can see CustomShuffleReader (local) used on both left and right relations . Why is that ? Does CustomShuffleReader (local) means it will read from respective local storages of executors where the shuffleMapstage has executed ?
Is it like , If I disable
, then the shuffle files for data to be broadcasted will be read from reducer side executor through usual shuffleClient of respective sortShuffleManager and if I enable that , it will use a CustomShuffleReader which will read the dataset to be broadcasted from respective node where it has written the shuffle files instead of reading from reducer side using reducer ids ?
I have went through all the below discussion threads/blogs and still the doubt exists:
- Why does spark shuffle when it is going to use broadcast while using Adaptive Query Execution
- https://www.waitingforcode.com/apache-spark-sql/what-new-apache-spark-3-local-shuffle-reader/read#:~:text=Local%20shuffle%20reader%20is%20then,the%20files%20from%20other%20executors.
- https://dev.to/yaooqinn/how-to-use-spark-adaptive-query-execution-aqe-in-kyuubi-2ek2