I have a sample file I am trying to find out for a given field total number of another field and its count and list of values from another field using combineByKey
. I am trying to understand combineByKey
, same requirement I understood from this question using aggregateByKey
, now I would like to understand combineByKey
.
I tried the below code which is the same as aggregateByKey
but I'm getting an type mismatch error. I'm not sure whether my types are correct for createCombiner
or mergeValue
or mergeCombiner
. Please help me to get a good understanding of combineByKey
.
Sample data:
44,8602,37.19
44,8331,99.19
44,1919,39.54
44,2682,41.88
44,7366,66.54
44,3405,81.09
44,9957,94.79
Code for combineByKey
:
val rdd = sc.textFile("file:///../customer-orders.csv_sample").map(x => x.split(",")).map(x => (x(0).toInt, (x(1).toInt, x(2).toFloat)))
def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)
def mergeValue = (acc: (Seq[Int],Double,Int),xs: (Int,Float)) => {
println(s"""mergeValue: (${acc._1} ++ ${Seq(xs._1)}, ${acc._2} +${xs._2},${acc._3} + 1)""")
(acc._1 ++ Seq(xs._1), acc._2 + xs._2, acc._3 + 1)
}
def mergeCombiner = (acc1: (Seq[Int],Double,Int), acc2: (Seq[Int],Double,Int)) => {
println(s"""mergeCombiner: (${acc1._1} ++ ${acc2._1}, ${acc1._2} +${acc2._2}, ${acc1._3} + ${acc2._3})""")
(acc1._1 ++ acc2._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
Error Message:
error: type mismatch;
found : ((Seq[Int], Double, Int)) => ((Seq[Int], Double, Int), Int)
required: ((Int, Float)) => ?
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
^
The expecting result is:
customerid, (orderids,..,..,....), totalamount, number of orderids
Using the provided sample data it will be:
(44,(List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.2200012207031,7))
Mismatch is pointing to createCombiner
. Could anyone please help to me to understand combineByKey
?