1

I have a list of members, which have many attributes, two of which being a name and an ID. I wish to get a list of tuples in an RDD. The tuples will contain the ID as the first element, and a count of the unique number of names associated with the ID as the second element.

e.g. like: ID, <# of unique names associated with ID>.

here's the code that I have written to accomplish this:

IDnametuple = members.map(lambda a: (a.ID, a.name))   # extract only ID and name
idnamelist = IDnametuple.groupByKey()                 # group the IDs together 
idnameunique_count = (idnamelist
     # set(tup[1]) should extract unique elements, 
     # and len should tell the number of them
    .map(lambda tup: (tup[0], len(set(tup[1]))))) 

It is incredibly slow, and much slower than similar operations that count unique attributes for each member.

Is there a quicker way to do this? I tried to use as many built-ins as possible, which is the correct way to speed things up, from what I've heard.

zero323
  • 322,348
  • 103
  • 959
  • 935
makansij
  • 9,303
  • 37
  • 105
  • 183

2 Answers2

3

Without any details we can only guess but the obvious choice is groupByKey. If each id is associated with a large number of names it can be pretty expensive due to extensive shuffling. The simplest improvement is to aggregateByKey or combineByKey:

create_combiner = set

def merge_value(acc, x):
    acc.add(x)
    return acc

def merge_combiners(acc1, acc2):
    acc1.update(acc2)
    return acc1

id_name_unique_count = (id_name_tuple  # Keep consistent naming convention
  .combineByKey(create_combiner, merge_value, merge_combiners)
  .mapValues(len))

If expected number of unique values is large you may prefer to replace exact approach which an approximation. One possible approach is to use Bloom filter to keep track of unique values instead of set.

For additional information regarding groupByKey vs aggregateByKey (reduceByKey, combineByKey) see:

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
1

That is basically the word count example of https://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs, but counting distinct key-value pairs:

from operator import add
IDnametuple = sc.parallelize([(0, "a"),(0, "a"),(0, "b"),(1, "a"),(1, "b"),(1, "c"),(2, "z")])
idnameunique_count = (IDnametuple.distinct()
                                  .map(lambda idName : (idName[0], 1))
                                  .reduceByKey(add))

So idnameunique_count.collect() returns [(0, 2), (1, 3), (2, 1)] where (0, "a") is counted only once. As @zero323 mentioned, here the key is replacing groupByKey by reduceByKey in order to avoid creating the intermediate list of names. All you need is the name count, which is a much smaller object that a potentially huge list. Also your version uses set() to eliminate duplicates sequentially in the closure code, while distinct is executed as a distributed parallel RDD transformation.

juanrh0011
  • 323
  • 2
  • 14
  • There is one problem with this approach - it has to shuffle twice. Once to get `distinct` values, and once to `reduceByKey`. Regarding parallelism... Unless number of keys is comparable to number of available cores it is exactly the same as when you use set on a grouped data. One way or another each partition is processed sequentially. – zero323 Nov 09 '15 at 09:06
  • By using a set as accumulator you can only handle datasets where the set of unique values for a key fits in the memory of a single worker. With my solution you don't have that limitation because you only store a number per key, as it is described in "Avoid GroupByKey". Your solution is basically a reimplementation of groupByKey using a set. I don't understand your comment about the the relationship between keys and cores. – juanrh0011 Nov 12 '15 at 05:08
  • There is exactly the limitation. To perform `distinct` you have to `reduceByKey` which perform only if number of duplicates is large. In such a case `combineByKey` should already reduce amount of data. Otherwise the first potential point of failure is shuffling and storing `((k, v), null)` pairs, which have to fit into memory. Next you perform another shuffle which on average has to shuffle most of the data once again. This is the part were you actually gain by counting. Regarding key-cores relationship. – zero323 Nov 12 '15 at 05:35
  • Every operation in Spark is by default sequential per partition. So if number of hashed values is large enough to fill all partitions you don't improve parallelism. – zero323 Nov 12 '15 at 05:35
  • 1
    The point is that those ((k,v), pairs) don't need to fit in the memory of a single worker, because they can reside in several partitions, which can be hosted in several workers. And if you don't have enough partitions you can repartition, you don't need to have all the partitions in memory at the same time, and you can also add more workers, that it's why the solution is horizontally scalable, also in memory – juanrh0011 Nov 13 '15 at 03:44
  • Actually all unique ((k, v), null) pairs for a given `(k, v).## % numberOfPartitions` have to reside on a memory of a single worker. And this, combined with shuffling can be a bottleneck. Just to be clear, I am not saying this a bad approach, but it comes with at a price. – zero323 Nov 13 '15 at 03:53