12

I'm writing a parquet file from DataFrame to S3. When I look at the Spark UI, I can see all tasks but 1 completed swiftly of the writing stage (e.g. 199/200). This last task appears to take forever to complete, and very often, it fails due to exceeding executor memory limit.

I'd like to know what is happening in this last task. How to optimize it? Thanks.

user2680514
  • 237
  • 1
  • 4
  • 7
  • I do notice that this last task executor is having much more shuffle read comparing to other completed executors. Does this mean the partitioning is not optimal? How to avoid it? – user2680514 Aug 04 '15 at 19:04
  • I am using Spark 1.3.1 – user2680514 Aug 04 '15 at 19:13
  • To determine whether data skew is the problem we need more info on the size of that last file vs the others. Given what you said about OOM errors, I think data skew is the problem. Without some code it will be hard to help in anything but a try-this try-that way. – BAR Oct 05 '15 at 16:46

4 Answers4

8

I have tried Glemmie Helles Sindholt solution and works very well. Here is the code:

path = 's3://...'
n = 2 # number of repartitions, try 2 to test
spark_df = spark_df.repartition(n)
spark_df.write.mode("overwrite").parquet(path)
bcosta12
  • 2,364
  • 16
  • 28
  • 2
    Normally I'd flag this as a "Thanks" kind of answer which could be deleted... but Glemmie's answer doesn't actually include any code. So this provides a good example and in my opinion is a positive contribution here. Thanks for contributing Bernardo! :) – JeremyW Jun 21 '19 at 18:46
5

It sounds like you have a data skew. You can fix this by calling repartition on your DataFrame before writing to S3.

Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
2

As others have noted, data skew is likely at play.

Besides that, I noticed that your task count is 200.

The configuration parameter spark.sql.shuffle.partitions configures the number of partitions that are used when shuffling data for joins or aggregations.

200 is the default for this setting, but generally it is far from an optimal value.

For small data, 200 could be overkill and you would waste time in the overhead of multiple partitions.

For large data, 200 can result in large partitions, which should be broken down into more, smaller partitions.

The really rough rules of thumb are: - have 2-3x number of partitions to cpu's. - Or ~128MB.

2GB's is the max partition size. If you are hovering just below 2000 partitions, Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000[1]

private[spark] object MapStatus {

  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > 2000) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }
...

You can try playing with this parameter at runtime:

spark.conf.set("spark.sql.shuffle.partitions", "300")

[1]What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?

Ryan
  • 269
  • 2
  • 4
0

This article - The Bleeding Edge: Spark, Parquet and S3 has a lot of useful information about Spark, S3 and Parquet. In particular, it talks about how the driver ends up writing out the _common_metadata_ files and can take quite a bit of time. There is a way to turn it off.

Unfortunately, they say that they go on to generate the common metadata themselves, but don't really talk about how they did so.

retnuH
  • 1,525
  • 2
  • 11
  • 18