I have two RDDs, one really large in size and other much smaller. I'd like find all unique tuples in large RDD with keys from the small RDD.
- Large RDD is so large that I have to avoid a full shuffle
- Small RDD is also large enough that I can't broadcast it. I may be able to broadcast its keys.
- There are duplicates tuples as well and I only care about the distinct tuples.
For example
large_rdd = sc.parallelize([('abcdefghij'[i%10], i) for i in range(100)] * 5)
small_rdd = sc.parallelize([('zab'[i%3], i) for i in range(10)])
expected_rdd = [
('a', [1, 4, 7, 0, 10, 20, 30, 40, 50, 60, 70, 80, 90]),
('b', [2, 5, 8, 1, 11, 21, 31, 41, 51, 61, 71, 81, 91])]
There are two expensive operations in my solution - join and distinct. Both I assume cause a full shuffle and leave the child RDD hash partitioned. Given that, is the following the best I can do ?
keys = sc.broadcast(small_rdd.keys().distinct().collect())
filtered_unique_large_rdd = (large_rdd
.filter(lambda (k, v): k in keys.value)
.distinct()
.groupByKey())
(filtered_unique_large_rdd
.join(small_rdd.groupByKey())
.mapValues(lambda x: sum([list(i) for i in x], []))
.collect())
Basically, I filter the tuples explicitly, pick distincts and then join with the smaller_rdd. I hope that that distinct operation will place the keys hash partitioned and will not cause another shuffle during the subsequent join.
Thanks in advance for any suggestions/ideas.
PS: It is not a duplicate of Which function in spark is used to combine two RDDs by keys since join (full shuffle) is an option.