3

I have two spark RDDs, A has 301,500,000 rows and B has 1,500,000 rows. Those 1.5 million rows in B all appear in A as well. I would like the set difference between those two RDDs, such that I return A with 300,000,000 rows, with those 1,500,000 rows from B no longer present in A.

I cannot use Spark DataFrames.

Here is the system I am using right now. These RDDs have primary keys. What I do below is create a (collected) list of those primary keys that appear in B, then iterate through the primary keys of A to find those which do not appear in the list of B primary keys.

a = sc.parallelize([[0,"foo",'a'],[1,'bar','b'],[2,'mix','c'],[3,'hem', 'd'],[4,'line','e']])
b = sc.parallelize([[1,'bar','b'],[2,'mix','c']])
b_primary_keys = b.map(lambda x: x[0]).collect()  # since first col = primary key


def sep_a_and_b(row):
    primary_key = row[0]
    if(primary_key not in b_primary_keys):
        return(row)


a_minus_b = a.map(lambda x: sep_a_and_b(x)).filter(lambda x: x != None)

Now, this works in this sample problem because A and B are tiny. However, this is unsuccessful when I use my true datasets A and B. Is there a better (more parallel) way to implement this?

Katya Willard
  • 2,152
  • 4
  • 22
  • 43

1 Answers1

6

This seems like something you can solve with a subtractByKey

val filteredA = a.subtractByKey(b)

To change to a key value:

val keyValRDD = rdd.map(lambda x: (x[:1],x[1:]))

*Note that my python is weak and there might be better ways to split the values

Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
  • I consistently get the error `ValueError: too many values to unpack` on any large dataset (I also attempted this on a dataset where A = 2,000,000 rows and B = 6,000 rows). My impression is that the compiler has to unpack A, which is too large to keep in main memory. – Katya Willard Sep 29 '15 at 17:18
  • Possibly related to https://stackoverflow.com/questions/7053551/python-valueerror-too-many-values-to-unpack – Justin Pihony Sep 29 '15 at 17:27
  • Correct. It appears the issue is not size related but rather an issue of having RDDs with many columns, not simply `(key,value)` pairs. – Katya Willard Sep 29 '15 at 17:51
  • Can you map into a key value where key is the first item and value is the array? – Justin Pihony Sep 29 '15 at 17:59
  • Yes. I'm going to update my question to more accurately reflect my dataset. The answer was to map the dataset from `[key, val1, val2, val3]` --> `(key, [val1, val2, val3])`, then use `subtractByKey` on that transformed data. – Katya Willard Sep 29 '15 at 18:50
  • @KatyaHandler Addressed – Justin Pihony Sep 29 '15 at 19:53