2

I have two RDDs, one really large in size and other much smaller. I'd like find all unique tuples in large RDD with keys from the small RDD.

  • Large RDD is so large that I have to avoid a full shuffle
  • Small RDD is also large enough that I can't broadcast it. I may be able to broadcast its keys.
  • There are duplicates tuples as well and I only care about the distinct tuples.

For example

large_rdd = sc.parallelize([('abcdefghij'[i%10], i) for i in range(100)] * 5)
small_rdd = sc.parallelize([('zab'[i%3], i) for i in range(10)])
expected_rdd = [
    ('a', [1, 4, 7, 0, 10, 20, 30, 40, 50, 60, 70, 80, 90]),
    ('b',   [2, 5, 8, 1, 11, 21, 31, 41, 51, 61, 71, 81, 91])]

There are two expensive operations in my solution - join and distinct. Both I assume cause a full shuffle and leave the child RDD hash partitioned. Given that, is the following the best I can do ?

keys = sc.broadcast(small_rdd.keys().distinct().collect())

filtered_unique_large_rdd = (large_rdd
    .filter(lambda (k, v): k in keys.value)
    .distinct()
    .groupByKey())

(filtered_unique_large_rdd
     .join(small_rdd.groupByKey())
     .mapValues(lambda x: sum([list(i) for i in x], []))
     .collect())

Basically, I filter the tuples explicitly, pick distincts and then join with the smaller_rdd. I hope that that distinct operation will place the keys hash partitioned and will not cause another shuffle during the subsequent join.

Thanks in advance for any suggestions/ideas.

PS: It is not a duplicate of Which function in spark is used to combine two RDDs by keys since join (full shuffle) is an option.

Community
  • 1
  • 1
zonked.zonda
  • 347
  • 1
  • 5
  • 19
  • You might also save some time by using Dataframes or Scala. – Reactormonk Dec 07 '15 at 17:17
  • Sounds like the best solution to me, given the strong constraints. Everything would be better if you could hash-partition the large RDD at the start. – Daniel Darabos Dec 07 '15 at 18:07
  • @DanielDarabos Good point. Since I'm interested in distinct values by each key, I can do .distinct().groupByKey() and hope that first distinct will hash partition and second groupByKey won't require a shuffle? – zonked.zonda Dec 07 '15 at 18:25

1 Answers1

1

There are two expensive operations in my solution - join and distinct.

Actually there are three expensive operations. You should add groupByKey to the list.

I hope that that distinct operation will place the keys hash partitioned and will not cause another shuffle during the subsequent join.

distinct won't, but subsequent groupByKey will. Problem is it requires your data to be shuffled twice - once for distinct and once for groupByKey.

filtered_unique_large_rdd.toDebugString()

## (8) PythonRDD[27] at RDD at PythonRDD.scala:43 []
##  |  MapPartitionsRDD[26] at mapPartitions at PythonRDD.scala:374 []
##  |  ShuffledRDD[25] at partitionBy at NativeMethodAccessorImpl.java:-2 []
##  +-(8) PairwiseRDD[24] at groupByKey at <ipython-input-11-8a3af1a8d06b>:2 []
##     |  PythonRDD[23] at groupByKey at <ipython-input-11-8a3af1a8d06b>:2 []
##     |  MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:374 []
##     |  ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:-2 []
##     +-(8) PairwiseRDD[20] at distinct at <ipython-input-11-8a3af1a8d06b>:2 []
##        |  PythonRDD[19] at distinct at <ipython-input-11-8a3af1a8d06b>:2 []
##        |  ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:423 []

You can try to replace distinct followed by groupByKey with aggregateByKey:

zeroValue = set()

def seqFunc(acc, x):
    acc.add(x)
    return acc

def combFunc(acc1, acc2):
    acc1.update(acc2)
    return acc1

grouped_by_aggregate = (large_rdd
    .filter(lambda kv: k[0] in keys.value)
    .aggregateByKey(zeroValue, seqFunc, combFunc))

Compared to your current solution it has to shuffle large_rdd only once:

grouped_by_aggregate.toDebugString()

## (8) PythonRDD[54] at RDD at PythonRDD.scala:43 []
##  |  MapPartitionsRDD[53] at mapPartitions at PythonRDD.scala:374
##  |  ShuffledRDD[52] at partitionBy at NativeMethodAccessorImpl.java:-2 []
##  +-(8) PairwiseRDD[51] at aggregateByKey at <ipython-input-60-67c93b2860a0 ...
##     |  PythonRDD[50] at aggregateByKey at <ipython-input-60-67c93b2860a0> ...
##     |  ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:423 []

Another possible improvement is to convert keys to set before broadcasting:

keys = sc.broadcast(set(small_rdd.keys().distinct().collect()))

Right now your code performs a linear search over the list for each step of the filter.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • I already have this solution in place, but I'm trying to reduce python code and utilize the underlying spark to improve performance. Wouldn't distinct also shuffle by hash (very similar to how aggregateByKey(set_functions) would do? – zonked.zonda Dec 07 '15 at 18:41
  • 1
    Yes, but on different keys (`hash((k, v))`) and __it is implemented in Python__. The second part is not a problem, but the first one is. – zero323 Dec 07 '15 at 18:42
  • Good point. Besides that, is there a way we can avoid filter and use join instead. Filter places a limit on how big the small_rdd can be. I'd have liked to do a join but limit the shuffle to only those keys that need to be in the final RDD. – zonked.zonda Dec 08 '15 at 18:22
  • When it comes to the final output you can safely drop it. `join` alone covers this part of the logic. The only reason to keep it is to reduce shuffling (especially if number-of-unique-keys-in-small_rdd << number-of-unique-keys-in-large_rdd) but is quite expensive by itself. See my answer to [Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark](http://stackoverflow.com/q/34139049/1560062) – zero323 Dec 08 '15 at 18:26
  • If you can accept some loss of information you could use bloom filter. – zero323 Dec 08 '15 at 18:33