I'm trying to aggregate a dataframe on multiple columns. I know that everything I need for the aggregation is within the partition- that is, there's no need for a shuffle because all of the data for the aggregation are local to the partition.
Taking an example, if I have something like
val sales=sc.parallelize(List(
("West", "Apple", 2.0, 10),
("West", "Apple", 3.0, 15),
("West", "Orange", 5.0, 15),
("South", "Orange", 3.0, 9),
("South", "Orange", 6.0, 18),
("East", "Milk", 5.0, 5))).repartition(2)
val tdf = sales.map{ case (store, prod, amt, units) => ((store, prod), (amt, amt, amt, units)) }.
reduceByKey((x, y) => (x._1 + y._1, math.min(x._2, y._2), math.max(x._3, y._3), x._4 + y._4))
println(tdf.toDebugString)
I get a result like
(2) ShuffledRDD[12] at reduceByKey at Test.scala:59 []
+-(2) MapPartitionsRDD[11] at map at Test.scala:58 []
| MapPartitionsRDD[10] at repartition at Test.scala:57 []
| CoalescedRDD[9] at repartition at Test.scala:57 []
| ShuffledRDD[8] at repartition at Test.scala:57 []
+-(1) MapPartitionsRDD[7] at repartition at Test.scala:57 []
| ParallelCollectionRDD[6] at parallelize at Test.scala:51 []
You can see the MapPartitionsRDD, which is good. But then there's the ShuffleRDD, which I want to prevent because I want the per-partition summarization, grouped by column values within the partition.
zero323's suggestion is tantalizingly close, but I need the "group by columns" functionality.
Referring to my sample above, I'm looking for the result that would be produced by
select store, prod, sum(amt), avg(units) from sales group by partition_id, store, prod
(I don't really need the partition id- that's just to illustrate that I want per-partition results)
I've looked at lots of examples but every debug string I've produced has the Shuffle. I really hope to get rid of the shuffle. I guess I'm essentially looking for a groupByKeysWithinPartitions function.