4

If I do the basic groupByKey operation on a JavaRdd<Tuple2<String, String>>, I get a JavaPairRdd<Tuple2<String, Iterable<String>>>:

someStartRdd.groupByKey()

because the size of the iterables in every tuple is going to be quite big (millions) and the number of keys is going to be big too, I'd like to process each iterable in a streaming parallel fashion like with RDD. Ideally I'd like an RDD per key.

For the moment the only thing I could think of is to collect, create lists and then parallelize:

List<Tuple2<String, Iterable<String>>> r1 = someStartRdd.groupByKey().collect();
for (Tuple2<String, Iterable<String>> tuple : r1){
    List<String> listForKey = MagicLibrary.iterableToString(tuple._2());
    JavaRdd<String> listRDD = sparkContext.parallelize(listForKey);
    ...start job on listRDD...
}

but I don't want to put all things in memory to create the list. Better solution?

gotch4
  • 13,093
  • 29
  • 107
  • 170

2 Answers2

3

If you have a large number of keys and a large number of values per key you're pretty much out of luck. Spark works best on long and narrow data and the only robust way to split RDD into multiple RDDs is to apply iterative filtering. You'll find an explanation why here: How to split a RDD into two or more RDDs?

Another approach, described in Scala Spark: Split collection into several RDD?, is to explicitly group data but since it requires non lazy evaluation it is unlikely to work with a large number of keys.

Finally, repartitioning may not work due to 2GB limits, data skews, and overall cost of large shuffles.

Keeping all of that in mind the possible strategy is to try to build your algorithms in a way that leverages without explicitly moving data around unless it is necessary. There are multiple methods you can use including sampling, salting and different approximations.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
1

You can try the following solution, although I would recommend against it since it implies a lot of shuffle operations, but would achieve your goal of processing each key "iterable in a streaming parallel fashion like with RDD. Ideally I'd like an RDD per key."

 List<String> keys = someStartRdd.keys().distinct().collect();
 HashMap<String,Integer> keysHash = new HashMap<String,Integer>();
 int pos = 0;
 for (String key : keys){
     keysHash.put(key,pos++);
 }
 repartitionedRDD = 
            someStartRdd.repartitionAndSortWithinPartitions(    
                  new CustomPartitioner(keysHash),//Partition your RDD
                  new CustomComparator()) //Sort by key the output

with CustomPartinioer as this

     public static class CustomPartitioner extends Partitioner implements Serializable
{
    private static final long serialVersionUID = 1L;
    private HashMap<String,Integer> keysHash;
    public CustomPartitioner(HashMap<String,Integer> keysHash){
        this.keysHash = keysHash 
    }

    @Override
    public int getPartition(Object key) {
        return ((int) hashKeys.get((String) key);
    }

    @Override
    public int numPartitions() {
        return hashKeys.size();
    }       
} 

Afterwards you can process in a "in a streaming parallel fashion" like this

repartitionedRDD.groupByKey().mapPartitions(new FlatMapFunction ...)
Radu Ionescu
  • 3,462
  • 5
  • 24
  • 43