3

I am relatively new to Apache Spark and Python and was wondering if something like what I am going to describe was doable?

I have a RDD of the form [m1, m2, m3, m4, m5, m6.......mn] (you get this when you run rdd.collect()). I was wondering if it was possible to transform this RDD into another RDD of the form [(m1, m2, m3), (m4, m5, m6).....(mn-2, mn-1, mn)]. The inner tuples should be of size k. If n is not divisible by k, then one of the tuples should have less than k elements.

I tried using the map function but was not able to get the desired output. It seems that the map function can only return a RDD with same number of elements as the RDD that was initially provided.

UPDATE : I tried using partitions and was also able to get it to work.

rdd.map(lambda l: (l, l)).partitionBy(int(n/k)).glom().map(lambda ll: [x[0] for x in ll])
zero323
  • 322,348
  • 103
  • 959
  • 935
thisisshantzz
  • 1,067
  • 3
  • 13
  • 33

3 Answers3

4

Olologin's answer almost has it but I believe what you are trying to do is group your RDD into 3-tuples instead of grouping your RDD into 3 groups of tuples. To do the former, try the following:

rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3)
                 .map(lambda (_, list): tuple([elem[0] for elem in list]))

When run in pyspark, I get the following:

>>> from __future__ import print_function    
>>> rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
>>> transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3).map(lambda (_, list): tuple([elem[0] for elem in list]))
>>> transformed.foreach(print)
...
('e4', 'e5', 'e6')
('e10',)
('e7', 'e8', 'e9')
('e1', 'e2', 'e3')
Rohan Aletty
  • 2,432
  • 1
  • 14
  • 20
  • Yes, that was what I was trying to do. Your solution works. I also tried doing this using partitionBy() and glom() and was also able to get it to work. – thisisshantzz Oct 12 '15 at 02:16
2

I assume that you are using pyspark api: I don't know if it's a best possible solution for this, but i think this can be done with: zipWithIndex groupBy and simple map.

# 3 - your grouping k
# ci - list of tuples (char, idx)
rdd = sc.parallelize(["a", "b", "c", "d", "e"]).zipWithIndex()\
        .groupBy(lambda (char, idx): idx/3 )\
        .map(lambda (remainder, ci):tuple([char for char, idx in ci]))\
        .collect()
print rdd

outputs:

[('a', 'b', 'c'), ('d', 'e')]

UPD: Thanks to @Rohan Aletty who corrected me.

Ibraim Ganiev
  • 8,934
  • 3
  • 33
  • 52
1

It is possible to handle this without shuffling (groupBy) but it requires a little bit more code compared to solutions by Olologin and Rohan Aletty. A whole idea is to transfer only the parts required to keep continuity between partitions:

from toolz import partition, drop, take, concatv


def grouped(self, n, pad=None):
    """
    Group RDD into tuples of size n

    >>> rdd = sc.parallelize(range(10))
    >>> grouped(rdd, 3).collect()
    >>> [(0, 1, 2), (3, 4, 5), (6, 7, 8), (9, None, None)]
    """
    assert isinstance(n, int)
    assert n > 0

    def _analyze(i, iter):
        """
        Given partition idx and iterator return a tuple
        (idx, numbe-of-elements prefix-of-size-(n-1))
        """
        xs = [x for x in iter]
        return [(i, len(xs), xs[:n - 1])]

    def _compact(prefixes, prefix):
        """
        'Compact' a list of prefixes to compensate for
        partitions with less than (n-1) elements
        """
        return prefixes + [(prefix + prefixes[-1])[:n-1]]

    def _compute(prvs, cnt):
        """
        Compute number of elements to drop from current and
        take from the next parition given previous state
        """
        left_to_drop, _to_drop, _to_take = prvs[-1]
        diff = cnt - left_to_drop

        if diff <= 0:
            return prvs + [(-diff, cnt, 0)]

        else:
            to_take = (n - diff % n) % n
            return prvs + [(to_take, left_to_drop, to_take)]

    def _group_partition(i, iter):
        """
        Return grouped entries for a given partition
        """
        (_, to_drop, to_take), next_head = heads_bd.value[i]
        return partition(n, concatv(
            drop(to_drop, iter), take(to_take, next_head)), pad=pad)

    if n == 1:
        return self.map(lambda x: (x, ))

    idxs, counts, prefixes = zip(
        *self.mapPartitionsWithIndex(_analyze).collect())

    heads_bd = self.context.broadcast({x[0]: (x[1], x[2]) for x in zip(idxs,
        reduce(_compute, counts, [(0, None, None)])[1:],
        reduce(_compact, prefixes[::-1], [[]])[::-1][1:])})

    return self.mapPartitionsWithIndex(_group_partition) 

It depends heavily heavily on a great toolz library but if you prefer to avoid external dependencies you can easily rewrite it using standard library.

Example usage:

>>> rdd = sc.parallelize(range(10))
>>> grouped(rdd, 3).collect()
[(0, 1, 2), (3, 4, 5), (6, 7, 8), (9, None, None)]

If you want to keep an consistent API you can monkey-patch RDD class:

>>> from  pyspark.rdd import RDD
>>> RDD.grouped = grouped
>>> rdd.grouped(4).collect()
[(0, 1, 2, 3), (4, 5, 6, 7), (8, 9, None, None)]

You can find basic tests on GitHub.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935