1

I have a sample file I am trying to find out for a given field total number of another field and its count and list of values from another field using combineByKey. I am trying to understand combineByKey, same requirement I understood from this question using aggregateByKey, now I would like to understand combineByKey.

I tried the below code which is the same as aggregateByKey but I'm getting an type mismatch error. I'm not sure whether my types are correct for createCombiner or mergeValue or mergeCombiner. Please help me to get a good understanding of combineByKey.

Sample data:

44,8602,37.19
44,8331,99.19
44,1919,39.54
44,2682,41.88
44,7366,66.54
44,3405,81.09
44,9957,94.79 

Code for combineByKey:

val rdd = sc.textFile("file:///../customer-orders.csv_sample").map(x => x.split(",")).map(x => (x(0).toInt, (x(1).toInt, x(2).toFloat)))

def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)

def mergeValue = (acc: (Seq[Int],Double,Int),xs: (Int,Float)) => {
  println(s"""mergeValue: (${acc._1} ++ ${Seq(xs._1)}, ${acc._2} +${xs._2},${acc._3} + 1)""")
  (acc._1 ++ Seq(xs._1), acc._2 + xs._2, acc._3 + 1)
}

def mergeCombiner = (acc1: (Seq[Int],Double,Int), acc2: (Seq[Int],Double,Int)) => {
  println(s"""mergeCombiner: (${acc1._1} ++ ${acc2._1}, ${acc1._2} +${acc2._2}, ${acc1._3} + ${acc2._3})""")
  (acc1._1 ++ acc2._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}

rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)

Error Message:

error: type mismatch;
found   : ((Seq[Int], Double, Int)) => ((Seq[Int], Double, Int), Int)
required: ((Int, Float)) => ?
rdd.combineByKey(createCombiner,mergeValue,mergeCombiner).collect().foreach(println)
                 ^

The expecting result is:

customerid, (orderids,..,..,....), totalamount, number of orderids

Using the provided sample data it will be:

(44,(List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.2200012207031,7))

Mismatch is pointing to createCombiner. Could anyone please help to me to understand combineByKey?

Shaido
  • 27,497
  • 23
  • 70
  • 73

3 Answers3

0

I'm not familiar with Spark.

I hope this might help you.

val array = Array((44,8602,37.19),(44,8331,99.19),(44,1919,39.54),(44,2682,41.88),(44,7366,66.54),(44,3405,81.09),(44,9957,94.79))

array.groupBy(_._1).map(e => (e._1, e._2.map(_._2).toList, e._2.map(_._3).sum))
//res1: scala.collection.immutable.Iterable[(Int, List[Int], Double)] = List((44,List(8602, 8331, 1919, 2682, 7366, 3405, 9957),460.21999999999997))

I see your this error is due to

def createCombiner = (tuple: (Seq[Int],Double, Int)) => (tuple,1)

I think createCombiner should take some Seq of tuples and return a tuples of Int and Seq (groupby)

def createCombiner = (tuple: Seq[(Int,Int, Double)]) => tuple.groupBy(_._1)

Hope this helps.

Puneeth Reddy V
  • 1,538
  • 13
  • 28
0

Here is the signature of combineByKey:

combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

mergeValue is of type (C, V) => C

Where C should be ((Seq[Int],Double, Int), Int) and V should be (Seq[Int],Double, Int)

Your mergeValue method has types C (Seq[Int],Double,Int) and V (Int,Float)

The type of mergeCombiner is also incorrect.

This should be (C, C) => C where C is ((Seq[Int],Double, Int), Int)

Terry Dactyl
  • 1,839
  • 12
  • 21
0

The problem here is the createCombiner function. Look at combineByKey:

combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

Simply said, C is the format you want to end up with ((Seq[Int], Double, Int)) and V what you begin with ((Int, Double)). Here I changed Float to Double since that is what is usually used in Spark. That means that the createCombiner function should look as follows:

def createCombiner = (tuple: (Int, Double)) => (Seq(tuple._1), tuple._2, 1)

Both mergeValue and mergeCombiner looks ok, however, you will not see any print statements in Spark if you execute the code on a cluster (see: Spark losing println() on stdout).

Shaido
  • 27,497
  • 23
  • 70
  • 73
  • Thanks the `createCombiner` which you provided worked as expected. So the understanding is I need to maintain the type `Double` or `Float` from the map itself and same should be applied to the V that is what i begin and that should be end up with same type. – Karthik Ramachandran Nov 14 '18 at 08:36
  • @KarthikRamachandran: Exactly, `createCombiner` should take data of the same format as what the RDD contains (the `V` part of the `RDD[(K,V)]` since the key (`K`) is used for grouping). This initial format should then be converted to the most basic combiner so the algorithm can start merging values into it (with `mergeValue`) and combine combiners (with `mergeCombiner`). – Shaido Nov 14 '18 at 09:29