4

Working with JavaPairRDD (key, value) pairs, I would like to process values associated with each key in a defined order (value comparator). Is it possible in Apache Spark?

Using Hadoop I would use secondary sort pattern. I am looking for a solution which can handle a set of values that doesn't fit in memory (even set of values with the same key)

Community
  • 1
  • 1
tomek
  • 771
  • 1
  • 7
  • 19
  • Regarding your first need, the only way I'm aware of would be to use a value to key conversion pattern (I tried this in my youth, see http://codereview.stackexchange.com/questions/56641/producing-a-sorted-wordcount-with-spark). However, I'm not sure it is suited for large memory use. – merours Nov 03 '14 at 09:22

2 Answers2

6

Here is an implementation from Sandy Ryza's Advanced Analytics with Spark:

github

I renamed some of the variables and added some comments so it makes sense in a more general context (this snippet is used in the book to analyze taxi data and some variables were named accordingly).

 def groupByKeyAndSortValues[K: Ordering : ClassTag, V: ClassTag, S](
    rdd: RDD[(K,V)],
    secondaryKeyFunc: (V) => S,
    splitFunc: (V, V) => Boolean,
    numPartitions: Int): RDD[(K, List[V])] = {
    // Extract the secondary key by applying a function to the value.
    val presess = rdd.map {
      case (key, value) => {
        ((key, secondaryKeyFunc(value)), value)
      }
    }
    // Define a partitioner that gets a partition by the first
    // element of the new tuple key.
    val partitioner = new FirstKeyPartitioner[K, S](numPartitions)

    // Set the implicit ordering  by the first element of the new
    // tuple key
    implicit val ordering: Ordering[(K, S)] = Ordering.by(_._1)

    presess.repartitionAndSortWithinPartitions(partitioner).mapPartitions(groupSorted(_, splitFunc))
  }
  /**
   * Groups the given iterator according to the split function. Assumes
   * the data comes in sorted. 
   */
  def groupSorted[K, V, S](
    it: Iterator[((K, S), V)],
    splitFunc: (V, V) => Boolean): Iterator[(K, List[V])] = {

    val res = List[(K, ArrayBuffer[V])]()
    it.foldLeft(res)((list, next) => list match {
      case Nil => {
        val ((key, _), value) = next
        List((key, ArrayBuffer(value)))
      }
      case cur :: rest => {
        val (curKey, valueBuf) = cur
        val ((key, _), value) = next
         if (!key.equals(curLic) || splitFunc(valueBuf.last, value)) {
          (key, ArrayBuffer(value)) :: list
        } else {
          valueBuf.append(value)
          list
        }
        }
    }).map { case (key, buf) => (key, buf.toList) }.iterator
  }

Here is the Partitioner:

  class FirstKeyPartitioner[K1, K2](partitions: Int) extends
      Partitioner {
    val delegate = new HashPartitioner(partitions)
    override def numPartitions = delegate.numPartitions
    override def getPartition(key: Any): Int = {
      val k = key.asInstanceOf[(K1, K2)]
      delegate.getPartition(k._1)
    }

  }
ryjm
  • 61
  • 1
  • 2
  • 1
    even if that link contains the answer, its better to copy the most important parts here – MZaragoza Mar 31 '15 at 14:26
  • Right, I remembered that right after posting but got sidetracked. I'll go ahead and edit it now, thanks for the reminder! – ryjm Mar 31 '15 at 14:28
4

There is an open issue to adding secondary sort feature. Till then the way to secondary sort is

rdd.map(row => (row.value, row.key)).sortByKey().map(row => (row.value, row.key))

sortByKey will not coalesce your keys so you could have multiples of the same value.

rado
  • 4,040
  • 3
  • 32
  • 26
Anant
  • 177
  • 1
  • 7