2

Hi i have created a Rdd like below

rdd1=sc.parallelize(['P','T','K'])
rdd1.collect()
['P', 'T', 'K']

Now I want to create new RDD2 with all possible combinations like below with new RDD.i.e.except same element combination like(p,p),(k,k),(t,t).

my expected output when i am doing

RDD2.collect()

[
    ('P'),('T'),('K'),
    ('P','T'),('P','K'),('T','K'),('T','P'),('K','P'),('K','T'),
    ('P','T','K'),('P','K','T'),('T','P','K'),('T','K','P'),('K','P','T'),('K','T','P')
]
pault
  • 41,343
  • 15
  • 107
  • 149
Sai
  • 1,075
  • 5
  • 31
  • 58
  • You need a `crossJoin` and filter. Probably something like `rdd.crossJoin(rdd).filter(lambda row: row[0] != row[1])`. This can be generalized for more combinations. – pault Dec 03 '18 at 13:57
  • i am getting error like "AttributeError: 'RDD' object has no attribute 'crossJoin'" – Sai Dec 03 '18 at 14:03
  • i have tried rdd2=rdd1.cartesian(rdd1) but it is not giving all combinations. – Sai Dec 03 '18 at 14:05
  • You intend to do that for more than 3 values I guess, right ? – Steven Dec 03 '18 at 14:30

2 Answers2

3

It seems that you want to generate all permutations of the elements in your rdd where each row contains unique values.

One way would be to first create a helper function to generate the desired combination of length n:

from functools import reduce
from itertools import chain

def combinations_of_length_n(rdd, n):
    # for n > 0
    return reduce(
        lambda a, b: a.cartesian(b).map(lambda x: tuple(chain.from_iterable(x))),
        [rdd]*n
    ).filter(lambda x: len(set(x))==n)

Essentially the function will do n Cartesian products of your rdd with itself and keep only the rows where all of the values are distinct.

We can test this out for n = [2, 3]:

print(combinations_of_length_n(rdd1, n=2).collect())
#[('P', 'T'), ('P', 'K'), ('T', 'P'), ('K', 'P'), ('T', 'K'), ('K', 'T')]

print(combinations_of_length_n(rdd1, n=3).collect())
#[('P', 'T', 'K'),
# ('P', 'K', 'T'),
# ('T', 'P', 'K'),
# ('K', 'P', 'T'),
# ('T', 'K', 'P'),
# ('K', 'T', 'P')]

The final output that you want is just union of these intermediate results with the original rdd (with the values mapped to tuples).

rdd1.map(lambda x: tuple((x,)))\
    .union(combinations_of_length_n(rdd1, 2))\
    .union(combinations_of_length_n(rdd1, 3)).collect()
#[('P',),
# ('T',),
# ('K',),
# ('P', 'T'),
# ('P', 'K'),
# ('T', 'P'),
# ('K', 'P'),
# ('T', 'K'),
# ('K', 'T'),
# ('P', 'T', 'K'),
# ('P', 'K', 'T'),
# ('T', 'P', 'K'),
# ('K', 'P', 'T'),
# ('T', 'K', 'P'),
# ('K', 'T', 'P')]

To generalize for any max number of repetitions:

num_reps = 3
reduce(
    lambda a, b: a.union(b),
    [
        combinations_of_length_n(rdd1.map(lambda x: tuple((x,))), i+1) 
        for i in range(num_reps)
    ]
).collect()
#Same as above

Note: Cartesian products are expensive operations and should be avoided when possible.

pault
  • 41,343
  • 15
  • 107
  • 149
0

There are several ways. You can run a loop and get permutations and store them in a list then convert list to rdd

>>> rdd1.collect()
['P', 'T', 'K']
>>> 
>>> l = []
>>> for i in range(2,rdd1.count()+1):
...     x = list(itertools.permutations(rdd1.toLocalIterator(),i))
...     l = l+x
... 
>>> rdd2 = sc.parallelize(l)
>>> 
>>> rdd2.collect()
[('P', 'T'), ('P', 'K'), ('T', 'P'), ('T', 'K'), ('K', 'P'), ('K', 'T'), ('P', 'T', 'K'), ('P', 'K', 'T'), ('T', 'P', 'K'), ('T', 'K', 'P'), ('K', 'P', 'T'), ('K', 'T', 'P')]
Ali Yesilli
  • 2,071
  • 13
  • 16
  • 1
    This is a solution, but it won't scale. In case the OP doesn't have much data, RDD/Spark shouldn't be used then. BTW I've also found it here https://stackoverflow.com/a/5898031/3415409 for plain python. – eliasah Dec 03 '18 at 14:39
  • Hi i had given small data for easy understanding but ,OP have huge Data. – Sai Dec 03 '18 at 14:54