7

I've implemented a solution to group RDD[K, V] by key and to compute data according to each group (K, RDD[V]), using partitionBy and Partitioner. Nevertheless, I'm not sure if it is really efficient and I'd like to have your point of view.

Here is a sample case : according to a list of [K: Int, V: Int], compute the Vs mean for each group of K, knowing that it should be distributed and that V values may be very large. That should give :

List[K, V] => (K, mean(V))

The simple Partitioner class:

class MyPartitioner(maxKey: Int) extends Partitioner {

    def numPartitions = maxKey

    def getPartition(key: Any): Int = key match {
      case i: Int if i < maxKey => i
    }
  }

The partition code :

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7))

      val rdd = sc.parallelize(l)
      val p =  rdd.partitionBy(new MyPartitioner(4)).cache()

      p.foreachPartition(x => {
        try {
          val r = sc.parallelize(x.toList)
          val id = r.first() //get the K partition id
          val v = r.map(x => x._2)
          println(id._1 + "->" + mean(v))
        } catch {
          case e: UnsupportedOperationException => 0
        }
      })

The output is :

1->13, 2->4, 3->7

My questions are :

  1. what does it really happen when calling partitionBy ? (sorry, I didn't find enough specs on it)
  2. Is it really efficient to map by partition, knowing that in my production case it would not be too much keys (as 50 for sample) by very much values (as 1 million for sample)
  3. What is the cost of paralellize(x.toList) ? Is it consistent to do it ? (I need a RDD in input of mean())
  4. How would you do it by yourself ?

Regards

Seb
  • 378
  • 5
  • 13

1 Answers1

4

Your code should not work. You cannot pass the SparkContext object to the executors. (It's not Serializable.) Also I don't see why you would need to.

To calculate the mean, you need to calculate the sum and the count and take their ratio. The default partitioner will do fine.

def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = {
  case class SumCount(sum: Double, count: Double)
  val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
    (sc, v) => SumCount(sc.sum + v, sc.count + 1.0),
    (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count))
  sumCounts.map(sc => sc.sum / sc.count)
}

This is an efficient single-pass calculation that generalizes well.

Daniel Darabos
  • 26,991
  • 10
  • 102
  • 114
  • thank you for your answer, of course it cannot work, I don't have all reflex of spark coding tricks and I've been spoiled by my local jvm. Nevertheless, in fact I don't need to compute the mean, but a complex ml method, and I need a RDD[Vector]. How could I get a list of (key, RDD[Vector]) from a unique RDD[Int, Int] ? I didn't find a solution. – Seb Feb 10 '15 at 09:52
  • I think this is a similar topic then: http://stackoverflow.com/questions/28166190/spark-column-wise-word-count/28199302#28199302 I'm not sure how you want to make `Vector`s from `Int`s. But if you want to get one RDD per key, you need to split the original RDD, and this is discussed in the linked answer. If it does not give you the answer, I suggest asking another question, perhaps with a clear, high-level explanation of what you want to do. – Daniel Darabos Feb 10 '15 at 12:36