0

I work on a project with Spark 2.4 on aws s3 and emr and I have a left join with two huge part of data. The spark execution is not stable, it fails frequently for memory issue.

The cluster has 10 machines of type m3.2xlarge, each machine has 16 vCore, 30 GiB memory, 160 SSD GB storage.

I have configuration like this:

          "--executor-memory",
          "6512M",
          "--driver-memory",
          "12g",
          "--conf",
          "spark.driver.maxResultSize=4g",
          "--conf",
          "spark.sql.autoBroadcastJoinThreshold=1073741824",

The left join happens between a left side of 150GB and right side around 30GB, so there are many shuffle. My solution will be to cut the right side to small enough, like 1G, so instead of shuffle, data will be broadcast. The only problem is after the first left join, the left side will already have the new columns from the right side, so the following left join will have duplication column, like col1_right_1, col2_right_1, col1_right_2, col2_right_2 and I have to rename col1_right_1/col1_right_2 to col1_left, col2_right_1/col2_right_2 to col2_left.

So I wonder, why does Spark allow shuffle to happen, instead of using broadcast everywhere. Shouldn't broadcast always be faster than shuffle? Why does not Spark do join like what I said, cut one side to small piece and broadcast it?

mingzhao.pro
  • 709
  • 1
  • 6
  • 20

1 Answers1

2

Let’s see the two options. If I understood correctly You are performing a broadcast and a join for each piece of the dataframe, where the size of the piece is the max broadcast threshold. Here the advantage is that you are basically sending over the network just one dataframe, but you are performing multiple joins. Each join to be performed has a an overhead. From:

Once the broadcasted Dataset is available on an executor machine, it is joined with each partition of the other Dataset. That is, for the values of the join columns for each row (in each partition) of the other Dataset, the corresponding row is fetched from the broadcasted Dataset and the join is performed.

This means that for each batch of the broadcast join, in each partition you would have to look the whole other dataset and perform the join.

Sortmerge or hash join have to perform a shuffle (if the datasets are not equally partitioned) but their joins are way more efficients.

LizardKing
  • 601
  • 6
  • 13
  • I see. broadcast will take more time, but the upper side is the pressure on RAM will be less, so less possible to fail. so maybe broadcast is better if we prefer a stable application, and the cost is longer process time, am I right? – mingzhao.pro Dec 09 '19 at 11:26
  • It may be, but I am not sure if it is the best approach. You may be interested in reviewing step 4 of this [response](https://stackoverflow.com/a/37849488/6654745). Can be interesting comparing the two solutions. – LizardKing Dec 09 '19 at 16:26