10

I have a set of records which I need to:

1) Group by 'date', 'city' and 'kind'

2) Sort every group by 'prize

In my code:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object Sort {

  case class Record(name:String, day: String, kind: String, city: String, prize:Int)

  val recs = Array (
      Record("n1", "d1", "k1", "c1", 10),
      Record("n1", "d1", "k1", "c1", 9),
      Record("n1", "d1", "k1", "c1", 8),
      Record("n2", "d2", "k2", "c2", 1),
      Record("n2", "d2", "k2", "c2", 2),
      Record("n2", "d2", "k2", "c2", 3)
      )

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("Test")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)
    val rs = sc.parallelize(recs)
    val rsGrp = rs.groupBy(r => (r.day, r.kind, r.city)).map(_._2)
    val x = rsGrp.map{r => 
      val lst = r.toList
      lst.map{e => (e.prize, e)}
      }
    x.sortByKey()
  }

}

When I try to sort group I get an error:

value sortByKey is not a member of org.apache.spark.rdd.RDD[List[(Int, 
 Sort.Record)]]

What is wrong? How to sort?

zork
  • 2,085
  • 6
  • 32
  • 48
  • If you make the sort argument(s) part of the key, it looks like you might also be able to use repartitionAndSortWithinPartitions() to get 'tera-sort' scale. See http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions – steamer25 Apr 10 '15 at 14:46

4 Answers4

11

You need define a Key and then mapValues to sort them.

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

  object Sort {

    case class Record(name:String, day: String, kind: String, city: String, prize:Int)

    // Define your data

    def main(args: Array[String]): Unit = {
      val conf = new SparkConf()
        .setAppName("Test")
        .setMaster("local")
        .set("spark.executor.memory", "2g")
      val sc = new SparkContext(conf)
      val rs = sc.parallelize(recs)

      // Generate pair RDD neccesary to call groupByKey and group it
      val key: RDD[((String, String, String), Iterable[Record])] = rs.keyBy(r => (r.day, r.city, r.kind)).groupByKey

      // Once grouped you need to sort values of each Key
      val values: RDD[((String, String, String), List[Record])] = key.mapValues(iter => iter.toList.sortBy(_.prize))

      // Print result
      values.collect.foreach(println)
    }
}
gasparms
  • 3,336
  • 22
  • 26
  • 1
    I have read in the spark documentation that groupBy is costly. Is there some other method through which we can achieve this more efficiently. – Sohaib Jun 11 '15 at 18:31
  • I don't know other method more efficient for sorting the values. Group By Key usually is not used alone because you will reduce or other operation with the values. – gasparms Jun 12 '15 at 07:10
  • 2
    This is converting the entire grouping into a List in memory. What if this grouping is very large? – oneirois May 09 '18 at 18:37
7

groupByKey is expensive, it has 2 implications:

  1. Majority of the data get shuffled in the remaining N-1 partitions in average.
  2. All of the records of the same key get loaded in memory in the single executor potentially causing memory errors.

Depending of your use case you have different better options:

  1. If you don't care about the ordering, use reduceByKey or aggregateByKey.
  2. If you want to just group and sort without any transformation, prefer using repartitionAndSortWithinPartitions (Spark 1.3.0+ http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions) but be very careful of what partitioner you specify and test it because you are now relying on side effects that may change behaviour in a different environment. See also examples in this repository: https://github.com/sryza/aas/blob/master/ch08-geotime/src/main/scala/com/cloudera/datascience/geotime/RunGeoTime.scala.
  3. If you are either applying a transformation or a non reducible aggregation (fold or scan) applied to the iterable of sorted records, then check out this library: spark-sorted https://github.com/tresata/spark-sorted. It provides 3 APIs for paired rdds: mapStreamByKey, foldLeftByKey and scanLeftByKey.
Gianmario Spacagna
  • 1,270
  • 14
  • 12
0

Replace map with flatMap

val x = rsGrp.map{r => 
  val lst = r.toList
  lst.map{e => (e.prize, e)}
  }

this will give you a

org.apache.spark.rdd.RDD[(Int, Record)] = FlatMappedRDD[10]

and then you can call sortBy(_._1) on the RDD above.

Soumya Simanta
  • 11,523
  • 24
  • 106
  • 161
0

As an alternative to @gasparms solution, I think one can try a filter followed by rdd.sortyBy operation. You filter each record that meets key criteria. Pre requisite is that you need to keep track of all your keys(filter combinations). You can also build it as you traverse through records.

nir
  • 3,743
  • 4
  • 39
  • 63