6

I searched a solution for a long time but didn't get any correct algorithm.

Using Spark RDDs in scala, how could I transform a RDD[(Key, Value)] into a Map[key, RDD[Value]], knowing that I can't use collect or other methods which may load data into memory ?

In fact, my final goal is to loop on Map[Key, RDD[Value]] by key and call saveAsNewAPIHadoopFile for each RDD[Value]

For example, if I get :

RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]

I'd like :

Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])]

I wonder if it would cost not too much to do it using filter on each key A, B, C of RDD[(Key, Value)], but I don't know if calling filter as much times there are different keys would be efficient ? (off course not, but maybe using cache ?)

Thank you

Seb
  • 378
  • 5
  • 13
  • 2
    "knowing that I can't use collect or other methods which may load data into memory ?". This doesn't make sense. The resulting Map is going to have to fit in memory anyway. – The Archetypal Paul Jan 23 '15 at 14:08
  • Just a wild stab in the dark; wouldn't groupBy(...) give you something you can use? It should give yous RDD[key, Iterable[values]] – thoredge Jan 23 '15 at 14:32
  • @thoredge I'm not sure that an iterable should fit in memory for very large amount of data, but indeed according to my input volume this could be a solution – Seb Jan 23 '15 at 20:49

3 Answers3

1

You should use the code like this (Python):

rdd = sc.parallelize( [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] ).cache()
keys = rdd.keys().distinct().collect()
for key in keys:
    out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y)
    out.saveAsNewAPIHadoopFile (...)

One RDD cannot be a part of another RDD and you have no option to just collect keys and transform their related values to a separate RDD. In my example you would iterate over the cached RDD which is ok and would work fast

0x0FFF
  • 4,948
  • 3
  • 20
  • 26
  • I was not sure about the efficiency of filter, but I think this is the solution I will implement. – Seb Jan 23 '15 at 20:50
  • There is no transformation ready for your logic, I'm afraid that if you want something more efficient you have to implement it by yourself – 0x0FFF Jan 24 '15 at 05:56
  • This is fundamentally a suboptimal solution. You can satisfy his end goal of writing to a separate file per key in one pass with a MultipleTextOutput. – Hamel Kothari Feb 11 '16 at 14:44
  • Agree, you can have another solution: http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job – 0x0FFF Feb 11 '16 at 14:51
  • You should be aware while running this code in production as you are doing collect action which runs on master. This can lead to your master going out of memory in no time. – Naveen Kumar Aug 15 '16 at 13:45
  • You run collect for keys only, and only once. If you have too many keys, you'll also have HDFS problems, not only memory issues – 0x0FFF Aug 15 '16 at 14:41
0

It sounds like what you really want is to save your KV RDD to a separate file for each key. Rather than creating a Map[Key, RDD[Value]] consider using a MultipleTextOutputFormat similar to the example here. The code is pretty much all there in the example.

The benefit of this approach is that you're guaranteed to only take one pass over the RDD after the shuffle and you get the same result you wanted. If you did this by filtering and creating several IDs as suggested in the other answer (unless your source supported pushdown filters) you would end up taking one pass over the dataset for each individual key which would be way slower.

Community
  • 1
  • 1
Hamel Kothari
  • 717
  • 4
  • 11
-1

This is my simple test code.

val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6)))
val groupby_RDD = test_RDD.groupByKey()
val result_RDD = groupby_RDD.map{v => 
    var result_list:List[Int] = Nil
    for (i <- v._2) {
        result_list ::= i
    }
    (v._1, result_list)
}

The result is below

result_RDD.take(3)
>> res86: Array[(String, List[Int])] = Array((A,List(1, 3, 2)), (B,List(5, 4)), (C,List(6)))

Or you can do it like this

val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6)))
val nil_list:List[Int] = Nil
val result2 = test_RDD.aggregateByKey(nil_list)(
    (acc, value) => value :: acc,
    (acc1, acc2) => acc1 ::: acc2 )

The result is this

result2.take(3)
>> res209: Array[(String, List[Int])] = Array((A,List(3, 2, 1)), (B,List(5, 4)), (C,List(6)))