3

Can anyone explain how the skew data is handled in production for Apache spark?

Scenario:

We submitted the spark job using "spark-submit" and in spark-ui it is observed that few tasks are taking long time which indicates presence of skew.

Questions:

(1) What steps shall we take(re-partitioning,coalesce,etc.)?

(2) Do we need to kill the job and then include the skew solutions in the jar and re-submit the job?

(3) Can we solve this issue by running the commands like (coalesce) directly from shell without killing the job?

Rahul
  • 399
  • 4
  • 10

2 Answers2

3

Data skews a primarily a problem when applying non-reducing by-key (shuffling) operations. The two most common examples are:

  • Non-reducing groupByKey (RDD.groupByKey, Dataset.groupBy(Key).mapGroups, Dataset.groupBy.agg(collect_list)).
  • RDD and Dataset joins.

Rarely, the problem is related to the properties of the partitioning key and partitioning function, with no per-existent issue with data distribution.

// All keys are unique - no obvious data skew
val rdd = sc.parallelize(Seq(0, 3, 6, 9, 12)).map((_, None))

// Drastic data skew
rdd.partitionBy(new org.apache.spark.HashPartitioner(3)).glom.map(_.size).collect
// Array[Int] = Array(5, 0, 0)

What steps shall we take(re-partitioning,coalesce,etc.)?

Repartitioning (never coalesce) can help you with the the latter case by

  • Changing partitioner.
  • Adjusting number of partitions to minimize possible impact of data (here you can use the same rules as for associative arrays - prime number and powers of two should be preferred, although might not resolve the problem fully, like 3 in the example used above).

The former cases typically won't benefit from repartitioning much, because skew is naturally induced by the operation itself. Values with the same key cannot be spread multiple partitions, and non-reducing character of the process, is minimally affected by the initial data distribution.

These cases have to be handled by adjusting the logic of your application. It could mean a number of things in practice, depending on the data or problem:

  • Removing operation completely.
  • Replacing exact result with an approximation.
  • Using different workarounds (typically with joins), for example frequent-infrequent split, iterative broadcast join or prefiltering with probabilistic filter (like Bloom filter).

Do we need to kill the job and then include the skew solutions in the jar and re-submit the job?

Normally you have to at least resubmit the job with adjust parameters.

In some cases (mostly RDD batch jobs) you can design your application, to monitor task execution and kill and resubmit particular job in case of possible skew, but it might hard to implement right in practice.

In general, if data skew is possible, you should design your application to be immune to data skews.

Can we solve this issue by running the commands like (coalesce) directly from shell without killing the job?

I believe this is already answered by the points above, but just to say - there is no such option in Spark. You can of course include these in your application.

1
  1. We can fine tune the query to reduce the complexity .

  2. We can Try Salting mechanism:

Salt the skewed column with random number creation better distribution of data across each partition.
  1. Spark 3 Enables Adaptive Query Execution mechanism to avoid such scenarios in production.

Below are couple of spark properties which we can fine tune accordingly.

spark.sql.adaptive.enabled=true
spark.databricks.adaptive.autoBroadcastJoinThreshold=true #changes sort merge join to broadcast join  dynamically , default size = 30 mb
spark.sql.adaptive.coalescePartitions.enabled=true #dynamically coalesced
spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB default
spark.sql.adaptive.coalescePartitions.minPartitionSize=true
spark.sql.adaptive.coalescePartitions.minPartitionNum=true # Default 2X number of cores
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=Default is 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256 MB
Deepak
  • 191
  • 1
  • 9
  • thanks Deepak , does this configuration works for spark-2.4.5 version ? – Shasu Mar 16 '22 at 11:39
  • 1
    I have not tried , as per me it will not work in spark 2.4.5. we must upgrade to 3.0.0 – Deepak Mar 23 '22 at 06:57
  • , any clue on this issue https://stackoverflow.com/questions/74035832/exception-occured-while-writing-delta-format-in-aws-s3 ? – Shasu Oct 12 '22 at 02:30