Using Spark 2.4.0. My production data is extremely skewed, so one of the tasks was taking 7x longer than everything else. I tried different strategies to normalize the data so that all executors worked equally -
- spark.default.parallelism
- reduceByKey(numPartitions)
- repartition(numPartitions)
My expectation was that all three of them should evenly partition, however playing with some dummy non-production data on Spark Local/Standalone suggests that options 1,2 normalize better than 3.
Data as below : (and i am trying to do a simple reduce on balance per account+ccy combination
account}date}ccy}amount
A1}2020/01/20}USD}100.12
A2}2010/01/20}SGD}200.24
A2}2010/01/20}USD}300.36
A1}2020/01/20}USD}400.12
Expected result should be [A1-USD,500.24], [A2-SGD,200.24], [A2-USD,300.36]
Ideally these should be partitioned in 3 different partitions.
javaRDDWithoutHeader
.mapToPair((PairFunction<Balance, String, Integer>) balance -> new Tuple2<>(balance.getAccount() + balance.getCcy(), 1))
.mapToPair(new MyPairFunction())
.reduceByKey(new ReductionFunction())
Code to check partitions
System.out.println("b4 = " +pairRDD.getNumPartitions());
System.out.println(pairRDD.glom().collect());
JavaPairRDD<DummyString, BigDecimal> newPairRDD = pairRDD.repartition(3);
System.out.println("Number of partitions = " +newPairRDD.getNumPartitions());
System.out.println(newPairRDD.glom().collect());
- Option 1: Doing nothing
- Option 2: Setting spark.default.parallelism to 3
- Option 3: reduceByKey with numPartitions = 3
Option 4: repartition(3)
For Option 1 Number of partitions = 2 [ [(DummyString{account='A2', ccy='SGD'},200.24), (DummyString{ account='A2', ccy='USD'},300.36)], [(DummyString{account='A1', ccy='USD'},500.24)] ]
For option 2
Number of partitions = 3 [ [(DummyString{account='A1', ccy='USD'},500.24)], [(DummyString{account='A2', ccy='USD'},300.36)], [(DummyString{account='A2', ccy='SGD'},200.24)]]
For option 3 Number of partitions = 3 [ [(DummyString{account='A1', ccy='USD'},500.24)], [(DummyString{account='A2', ccy='USD'},300.36)], [(DummyString{account='A2', ccy='SGD'},200.24)] ]
For option 4 Number of partitions = 3 [[], [(DummyString{ account='A2', ccy='SGD'},200.24)], [(DummyString{ account='A2', ccy='USD'},300.36), (DummyString{ account='A1', ccy='USD'},500.24)]]
Conclusion : options 2(spark.default.parallelism) and 3(reduceByKey(numPartitions) normalized much better than option 4 (repartition) Fairly deterministic results, never saw option4 normalize into 3 partitions.
Question :
- is reduceByKey(numPartitions) much better than repartition or
- is this just because the sample data set is so small ? or
- is this behavior going to be different when we submit via a YARN cluster