2

I have a pyspark.rdd.PipelinedRDD called myRDD. This is its sample content:

[((111, u'BB', u'A'), (444, u'BB', u'A')),
 ((222, u'BB', u'A'), (888, u'BB', u'A')),
 ((333, u'BB', u'B'), (999, u'BB', u'A')),...]

I need to delete all entries where the third column values do not coincide. The expected result is this one:

[((111, u'BB', u'A'), (444, u'BB', u'A')),
 ((222, u'BB', u'A'), (888, u'BB', u'A')),...]

How can I do it?

Dinosaurius
  • 8,306
  • 19
  • 64
  • 113

1 Answers1

6

You can use filter with a lambda expression to check that the third element of each tuple pair are the same such as:

l = [((111, u'BB', u'A'), (444, u'BB', u'A')),
     ((222, u'BB', u'A'), (888, u'BB', u'A')),
     ((333, u'BB', u'B'), (999, u'BB', u'A'))]

rdd = sc.parallelize(l)
rdd = rdd.filter(lambda x: x[0][2] == x[1][2])
result = rdd.collect()
print result

>>> [((111, u'BB', u'A'), (444, u'BB', u'A')), ((222, u'BB', u'A'), (888, u'BB', u'A'))]

To answer your follow up comment, remember, a lambda is just a function, if you have more complex logic, you can just write it out as a function. You could do something like:

def do_stuff(x):
    if (x[0][2] == 'C') or (x[1][2] == 'C'):
        return x     
    else:
        if x[0][2] == x[1][2]: return x
    return None

rdd = rdd.map(do_stuff).filter(lambda x: x is not None)

res = rdd.collect()
Varun Balupuri
  • 363
  • 5
  • 17
  • It works nicely. By the way, is it possible to add an exception, like excluding the value `C`? If any of pair's third column values is `C`, then no comparison should be done. – Dinosaurius Oct 26 '17 at 10:07