1

I'm new to pyspark. I have a Pair RDD (key, value). I would like to create a histogram of n buckets for each key. The output would be something like this:

[(key1, [...buckets...], [...counts...]),
 (key2, [...buckets...], [...counts...])]

I have seen examples for retrieving the max value or the sum of each key, but is there a way to pass the histogram(n) function to be applied to each key's values?

Petrichor
  • 111
  • 1
  • 7

2 Answers2

0

I know this post is rather old, but for people still seeking for a PySpark solution, here's my two cents on the issue.

Let us consider a (key, value) pair RDD and let's say by "histogram" we main a plain counter of how many different values we have for each key, along with their respective cardinality.

aggregateByKey() is a good way to go. In aggregateByKey() one basically declares three input values: the aggregator default value, the within-partition aggregation function, the between-partition aggregation function.

Let us consider to have an RDD for the form

[(124, 2),
 (124, 2),
 (124, 2),
 (125, 2),
 (125, 2),
 (125, 2),
 (126, 2),
 (126, 2),
 (126, 2),
 (127, 2),
 (127, 2),
 (127, 2),
 (128, 2),
 (128, 2),
 (128, 2),
 (129, 2),
 (129, 2),
 (129, 2),
 (130, 2),
 (130, 2),
 (130, 2),
 (131, 2),
 (131, 2),
 (131, 2),
 (132, 2),
 (132, 2),
 (132, 2),
 (133, 2),
 (133, 2),
 (133, 2),
 (134, 2),
 (134, 2),
 (134, 2),
 (135, 2),
 (135, 2),
 (135, 2),
 (136, 2),
 (136, 1),
 (136, 2),
 (137, 2),
 (137, 2),
 (137, 2),
 (138, 2),
 (138, 2),
 (138, 2),
 (139, 2),
 (139, 2),
 (139, 2),
 (140, 2),
 (140, 2),
 (140, 2),
 (141, 2),
 (141, 1),
 (141, 1),
 (142, 2),
 (142, 2),
 (142, 2),
 (143, 2),
 (143, 2),
 (143, 2),
 (144, 1),
 (144, 1),
 (144, 2),
 (145, 1),
 (145, 1),
 (145, 1),
 (146, 2),
 (146, 2),
 (146, 2),
 (147, 2),
 (147, 2),
 (147, 2),
 (148, 2),
 (148, 2),
 (148, 2),
 (149, 2),
 (149, 2),
 (149, 2),
 (150, 2),
 (150, 2),
 (150, 2),
 (151, 2),
 (151, 2),
 (151, 2),
 (152, 2),
 (152, 2),
 (152, 2),
 (153, 2),
 (153, 1),
 (153, 2),
 (154, 2),
 (154, 2),
 (154, 2),
 (155, 2),
 (155, 1),
 (155, 2),
 (156, 2),
 (156, 2),
 (156, 2),
 (157, 1),
 (157, 2),
 (157, 2),
 (158, 2),
 (158, 2),
 (158, 2),
 (159, 2),
 (159, 2),
 (159, 2),
 (160, 2),
 (160, 2),
 (160, 2),
 (161, 2),
 (161, 1),
 (161, 2),
 (162, 2),
 (162, 2),
 (162, 2),
 (163, 2),
 (163, 1),
 (163, 2),
 (164, 2),
 (164, 2),
 (164, 2),
 (165, 2),
 (165, 2),
 (165, 2),
 (166, 2),
 (166, 1),
 (166, 2),
 (167, 2),
 (167, 2),
 (167, 2),
 (168, 2),
 (168, 1),
 (168, 1),
 (169, 2),
 (169, 2),
 (169, 2),
 (170, 2),
 (170, 2),
 (170, 2),
 (171, 2),
 (171, 2),
 (171, 2),
 (172, 2),
 (172, 2),
 (172, 2),
 (173, 2),
 (173, 2),
 (173, 1),
 (174, 2),
 (174, 1),
 (174, 1),
 (175, 1),
 (175, 1),
 (175, 1),
 (176, 1),
 (176, 1),
 (176, 1),
 (177, 2),
 (177, 2),
 (177, 2)]

To the best of my knowledge, the easiest way to do is to aggregate values in each key according to a Python dictionary where the dictionary key is the RDD value and the value associated to each dictionary key is the counter of how many RDD values there are for each RDD value. The RDD key does not need to be considered, as the aggregateByKey() function automatically treats the RDD keys.

The aggregation call has the form

myRDD.aggregateByKey(dict(), withinPartition, betweenPartition)

where we initialise all the accumulators as empty dictionaries.

The within-partition aggregation function, thus, has the following form

def withinPartition(dictionary, record):
    if record in dictionary.keys():
        dictionary[record] += 1
    else:
        dictionary[record] = 1
    return dictionary

where dictionary is the per-RDD value counter, whereas record is a given RDD value (an integer, in this example, see the RDD example above). Basically, if a given RDD value already exists in the dictionary, we increment a +1 counter. Otherwise, we initialise the counter.

The between-partition function works pretty much the same

def betweenPartition(dictionary1, dictionary2):
    return {k: dictionary1.get(k, 0) + dictionary2.get(k, 0) for k in set(dictionary1) | set(dictionary2)}

Basically, for a given RDD key, let us consider to have two dictionaries. We merge such two dictionaries into a unique dictionary by summing the values for a given key, or adding a given key if it does not exist in one of the two dictionaries (logic OR). Credits to georg's solution in this post for dictionary merging.

The resulting RDD will have the form

[(162, {2: 3}),
 (132, {2: 3}),
 (168, {1: 2, 2: 1}),
 (138, {2: 3}),
 (174, {1: 2, 2: 1}),
 (144, {1: 2, 2: 1}),
 (150, {2: 3}),
 (156, {2: 3}),
 (126, {2: 3}),
 (163, {1: 1, 2: 2}),
 (133, {2: 3}),
 (169, {2: 3}),
 (139, {2: 3}),
 (175, {1: 3}),
 (145, {1: 3}),
 (151, {2: 3}),
 (157, {1: 1, 2: 2}),
 (127, {2: 3}),
 (128, {2: 3}),
 (164, {2: 3}),
 (134, {2: 3}),
 (170, {2: 3}),
 (140, {2: 3}),
 (176, {1: 3}),
 (146, {2: 3}),
 (152, {2: 3}),
 (158, {2: 3}),
 (129, {2: 3}),
 (165, {2: 3}),
 (135, {2: 3}),
 (171, {2: 3}),
 (141, {1: 2, 2: 1}),
 (177, {2: 3}),
 (147, {2: 3}),
 (153, {1: 1, 2: 2}),
 (159, {2: 3}),
 (160, {2: 3}),
 (130, {2: 3}),
 (166, {1: 1, 2: 2}),
 (136, {1: 1, 2: 2}),
 (172, {2: 3}),
 (142, {2: 3}),
 (148, {2: 3}),
 (154, {2: 3}),
 (124, {2: 3}),
 (161, {1: 1, 2: 2}),
 (131, {2: 3}),
 (167, {2: 3}),
 (137, {2: 3}),
 (173, {1: 1, 2: 2}),
 (143, {2: 3}),
 (149, {2: 3}),
 (155, {1: 1, 2: 2}),
 (125, {2: 3})]

The original RDD keys can still be found in this new RDD. Each new RDD value is a dictionary. In turn, each dictionary key corresponds to one of the possible RDD values whereas each dictionary value is the counter of how many times a given RDD value exists for each RDD key.

AlessioX
  • 3,167
  • 6
  • 24
  • 40
-1

Try:

>>> import numpy as np
>>>
>>> rdd.groupByKey().map(lambda (x, y): np.histogram(list(y)))