1

I have these Rows:

(key1,Illinois|111|67342|...)
(key1,Illinois|121|67142|...)
(key2,Hawaii|113|67343|...)
(key1,Illinois|211|67442|...)
(key3,Hawaii|153|66343|...)
(key3,Ohio|193|68343|...)

(1) How do I get the unique keys?

(2) How do I get the number of rows PER key (key1 - 3 rows, key2 - 1 row, key 3 - 2 rows... so the output would be: 3,1,2)

(3) How do I get the byte size of rows PER key (5MB,2MB,3MB)


EDIT 1. this is my new code:

val rdd : RDD[(String, Array[String])] = ...
val rdd_res = rdd.groupByKey().map(row => (row._1, row._2.size, byteSize(row._2)))

val rddKeys = rdd_res.map(row => row._1)
val rddCount = rdd_res.map(row => row._2)     
val rddByteSize = rdd_res.map(row => row._3)

How do I implement the byteSize? I want to get the size that will be saved to disk.


EDIT 2.

  val rdd_res : RDD[(String, (Int, Int))] = rdd.aggregateByKey((0,0))((accum, value) => (accum._1 + 1, accum._2 + size(value)), (first, second) => (first._1 + second._1, first._2 + second._2))

  val rdd_res_keys = rdd_res.map(row=>row._1).collect().mkString(",")
  val rdd_res_count = rdd_res.map(row=>row._2).collect().map(_._1).mkString(",")
  val rdd_res_bytes = rdd_res.map(row=>row._2).collect().map(_._2).mkString(",")
sophie
  • 991
  • 2
  • 15
  • 34

2 Answers2

1

For distinct keys, you should switch the order:

rdd.keys.distinct.collect

But you technically get this from counting the keys into a map...via countByKey which returns a map of key->count

rdd.countByKey

And, to get the byte size, then you should review this SO question, as it is going to be dependent on decoding. But, once you've decided on a size method, then you can get it via:

rdd.aggregateByKey(0)((accum, value) => accum + size(value), _ + _)

Or, you can do it all in one:

rdd.aggregateByKey((0,0))((accum, value) => (accum._1 + 1, accum._2 + size(value), (first, second) => (first._1 + second._1, first._2 + second._2))

Which should yield an RDD[(String, (Int, Int))] where the first item in the tuple is the key count, and the second is the key size

Community
  • 1
  • 1
Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
  • @sophie Kiiiind of...You should only collect once and use that data. The data may sit on the hot path, but you are still running the same DAG 3 times. – Justin Pihony Apr 24 '15 at 03:56
0

Considering you have a pair RDD of (key, value).

You can get key and count using below

rdd_res = rdd_inp.countByKey

You can list of size for a key using below

rdd_size_res = rdd_inp.groupByKey().map((a,b)=>(a,size(b)))

def size(src: List[String]):List[String] = {

    src.map(a =>  (32 + a.length() * 2).toString())


  }

Please check if the above works for your scenario.

Karthik
  • 1,801
  • 1
  • 13
  • 21
  • Edited my post to show the new code, I used map instead of foreach since the latter returns a Unit. For the implementation of byteSize, I can't make yours work. I tried row._2.mkString(",").length * 2 + 32) but it doesn't match the disk size. (577432 bytes vs 6.7MB) – sophie Apr 24 '15 at 01:12
  • I don't know how big the data set might be but reduceByKey would be a faster option for the count. – Alister Lee Apr 24 '15 at 01:15
  • the rdd could contain 1 million rows. the distinct key would be around 100. im not sure how to do this by reduceByKey since I need the distinct key and 2 aggregates. – sophie Apr 24 '15 at 01:53
  • Sophie, the above mentioned size method will take input as list of strings for a particular key and return back a list. The formula 32+a.lenght()*2 is normally used to get size of a string that in occupies in JVM. – Karthik Apr 24 '15 at 02:23
  • 1
    groupByKey followed by a map is inefficient... as the group shuffles everything all-all before the map, which then promptly loses it via map – Justin Pihony Apr 24 '15 at 02:34
  • Yes i agree ... agreegateByKey will be more efficient .. thanks for advice :) – Karthik Apr 24 '15 at 03:10