-3

I have a flat map that returns the Sequence Seq((20,6),(22,6),(23,6),(24,6),(20,1),(22,1)) now I need to use the reduceByKey() on the sequence that I got from the flat map to find the minimum value for each key.

I tried using .reduceByKey(a,min(b)) and .reduceByKey((a, b) => if (a._1 < b._1) a else b) but neither of them are working.

This is my code

for(i<- 1 to 5){

var graph=graph.flatMap{ in => in match{ case (x, y, zs) => (x, y) :: zs.map(z => (z, y))}
.reduceByKey((a, b) => if (a._1 < b._1) a else b)

}

For each distinct key the flatmap generates I need to get the minimum value for that key. Eg: the flatmap generates Seq((20,6),(22,6),(23,6),(24,6),(20,1),(22,1)) the resultByKey() should generate (20,1),(22,1),(23,6),(24,6)

Sarath Subramanian
  • 20,027
  • 11
  • 82
  • 86
varun
  • 25
  • 2
  • 11
  • What's inside of `graph` at the beginning? – Oli Mar 29 '19 at 20:29
  • "neither of them are working" isn't really a question ... – Dima Mar 29 '19 at 20:47
  • it would be easier to show the type of `graph` before the code snippet. And sometimes, if not often, it might also be helpful to explain what your algorithm is trying to achieve. Maybe people know a simpler solution or it even exists an implementation somewhere. You code for example looks like some graph algorithm – UninformedUser Mar 30 '19 at 04:23
  • @Dima Question is in the title... – alofgran Nov 12 '19 at 20:52

1 Answers1

0

Here is the signature of reduceByKey:

def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

Basically, given a RDD of key/value pairs, you need to provide a function that reduces two values (and not the entire pair) into one. Therefore, you can use it as follows:

val rdd = sc.parallelize(Seq((20,6),(22,6),(23,6),(24,6),(20,1),(22,1)))
val result = rdd.reduceByKey((a, b) => if (a < b) a else b)
result.collect
// Array[(Int, Int)] = Array((24,6), (20,1), (22,1), (23,6))
Oli
  • 9,766
  • 5
  • 25
  • 46