1

My 100m in size, quantized data:

(1424411938', [3885, 7898])
(3333333333', [3885, 7898])

Desired result:

(3885, [3333333333, 1424411938])
(7898, [3333333333, 1424411938])

So what I want, is to transform the data so that I group 3885 (for example) with all the data[0] that have it). Here is what I did in :

def prepare(data):
    result = []
    for point_id, cluster in data:
        for index, c in enumerate(cluster):
            found = 0
            for res in result:
                if c == res[0]:
                    found = 1
            if(found == 0):
                result.append((c, []))
            for res in result:
                if c == res[0]:
                    res[1].append(point_id)
    return result

but when I mapPartitions()'ed data RDD with prepare(), it seem to do what I want only in the current partition, thus return a bigger result than the desired.

For example, if the 1st record in the start was in the 1st partition and the 2nd in the 2nd, then I would get as a result:

(3885, [3333333333])
(7898, [3333333333])
(3885, [1424411938])
(7898, [1424411938])

How to modify my prepare() to get the desired effect? Alternatively, how to process the result that prepare() produces, so that I can get the desired result?


As you may already have noticed from the code, I do not care about speed at all.

Here is a way to create the data:

data = []
from random import randint
for i in xrange(0, 10):
    data.append((randint(0, 100000000), (randint(0, 16000), randint(0, 16000))))
data = sc.parallelize(data)
Community
  • 1
  • 1
gsamaras
  • 71,951
  • 46
  • 188
  • 305

1 Answers1

2

You can use a bunch of basic pyspark transformations to achieve this.

>>> rdd = sc.parallelize([(1424411938, [3885, 7898]),(3333333333, [3885, 7898])])
>>> r = rdd.flatMap(lambda x: ((a,x[0]) for a in x[1]))

We used flatMap to have a key, value pair for every item in x[1] and we changed the data line format to (a, x[0]), the a here is every item in x[1]. To understand flatMap better you can look to the documentation.

>>> r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])))

We just grouped all key, value pairs by their keys and used tuple function to convert iterable to tuple.

>>> r2.collect()
[(3885, (1424411938, 3333333333)), (7898, (1424411938, 3333333333))]

As you said you can use [:150] to have first 150 elements, I guess this would be proper usage:

r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])[:150]))

I tried to be as explanatory as possible. I hope this helps.

gsamaras
  • 71,951
  • 46
  • 188
  • 305
malisit
  • 1,253
  • 2
  • 17
  • 36
  • Oops, I thought I had commented. Could you please explain this a bit? For example, in my real application I want to keep only 150 items of every list, so I guess I could just slice the `tuple(x[1])`. The code is still running, so I don't know if it will do the trick... :) – gsamaras Sep 09 '16 at 00:20
  • Thank you, this will help the future users too, not just me! I will wait for the code to execute and then act. :) Delete the comment under my question, no need for it (I will edit that too when you do it) :) – gsamaras Sep 09 '16 at 00:34
  • I see the same behavior as in [Active tasks is a negative number spark ui](http://stackoverflow.com/questions/38964007/active-tasks-is-a-negative-number-spark-ui). I wonder if the problem lies on the code or something weird happen on the cluster... – gsamaras Sep 09 '16 at 01:04
  • The code was OK, my partitions were many, I updated the answer in that link, thanks paradosko. – gsamaras Sep 09 '16 at 01:16