As others have noted, data skew is likely at play.
Besides that, I noticed that your task count is 200
.
The configuration parameter spark.sql.shuffle.partitions
configures the number of partitions that are used when shuffling data for joins or aggregations.
200
is the default for this setting, but generally it is far from an optimal value.
For small data, 200 could be overkill and you would waste time in the overhead of multiple partitions.
For large data, 200 can result in large partitions, which should be broken down into more, smaller partitions.
The really rough rules of thumb are:
- have 2-3x number of partitions to cpu's.
- Or ~128MB.
2GB's is the max partition size. If you are hovering just below 2000 partitions, Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000[1]
private[spark] object MapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > 2000) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
}
}
...
You can try playing with this parameter at runtime:
spark.conf.set("spark.sql.shuffle.partitions", "300")
[1]What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?