4

In my PySpark application, I have two RDD's:

  • items - This contains item ID and item name for all valid items. Approx 100000 items.

  • attributeTable - This contains the fields user ID, item ID and an attribute value of this combination in that order. These is a certain attribute for each user-item combination in the system. This RDD has several 100s of 1000s of rows.

I would like to discard all rows in attributeTable RDD that don't correspond to a valid item ID (or name) in the items RDD. In other words, a semi-join by the item ID. For instance, if these were R data frames, I would have done semi_join(attributeTable, items, by="itemID")

I tried the following approach first, but found that this takes forever to return (on my local Spark installation running on a VM on my PC). Understandably so, because there are such a huge number of comparisons involved:

# Create a broadcast variable of all valid item IDs for doing filter in the drivers
validItemIDs = sc.broadcast(items.map(lambda (itemID, itemName): itemID)).collect())
attributeTable = attributeTable.filter(lambda (userID, itemID, attributes): itemID in set(validItemIDs.value))

After a bit of fiddling around, I found that the following approach works pretty fast (a min or so on my system).

# Create a broadcast variable for item ID to item name mapping (dictionary) 
itemIdToNameMap = sc.broadcast(items.collectAsMap())

# From the attribute table, remove records that don't correspond to a valid item name.
# First go over all records in the table and add a dummy field indicating whether the item name is valid
# Then, filter out all rows with invalid names. Finally, remove the dummy field we added.
attributeTable = (attributeTable
                  .map(lambda (userID, itemID, attributes): (userID, itemID, attributes, itemIdToNameMap.value.get(itemID, 'Invalid')))
                  .filter(lambda (userID, itemID, attributes, itemName): itemName != 'Invalid')
                  .map(lambda (userID, itemID, attributes, itemName): (userID, itemID, attributes)))

Although this works well enough for my application, it feels more like a dirty workaround and I am pretty sure there must be another cleaner or idiomatically correct (and possibly more efficient) way or ways to do this in Spark. What would you suggest? I am new to both Python and Spark, so any RTFM advices will also be helpful if you could point me to the right resources.

My Spark version is 1.3.1.

soorajmr
  • 520
  • 4
  • 9

2 Answers2

2

Just do a regular join and then discard the "lookup" relation (in your case items rdd).

If these are your RDDs (example taken from another answer):

items = sc.parallelize([(123, "Item A"), (456, "Item B")])
attributeTable = sc.parallelize([(123456, 123, "Attribute for A")])

then you'd do:

attributeTable.keyBy(lambda x: x[1])
  .join(items)
  .map(lambda (key, (attribute, item)): attribute)

And as a result, you only have tuples from attributeTable RDD which have a corresponding entry in the items RDD:

[(123456, 123, 'Attribute for A')]

Doing it via leftOuterJoin as suggested in another answer will also do the job, but is less efficient. Also, the other answer semi-joins items with attributeTable instead of attributeTable with items.

Jakub Kukul
  • 12,032
  • 3
  • 54
  • 53
0

As others have pointed out, this is probably most easily accomplished by leveraging DataFrames. However, you might be able to accomplish your intended goal by using the leftOuterJoin and the filter functions. Something a bit hackish like the following might suffice:

items = sc.parallelize([(123, "Item A"), (456, "Item B")])
attributeTable = sc.parallelize([(123456, 123, "Attribute for A")])
sorted(items.leftOuterJoin(attributeTable.keyBy(lambda x: x[1]))
       .filter(lambda x: x[1][1] is not None)
       .map(lambda x: (x[0], x[1][0])).collect())

returns

[(123, 'Item A')]
santon
  • 4,395
  • 1
  • 24
  • 43
  • Never heard of anything called left inner join, but I get your point. However, I think the leftOuterJoin API assumes the first field in both the RDDs are to be used for join, which doesn't hold true in my case. You could rearrange the RDDs the way join APIs expect, but that will be as dirty and inefficient as the method I mentioned in my question. Also, a Spark join operation returns elements from the second RDD tupled with those from the first, which I will need to filter out. I need only the fields from the first RDD. Like this: https://en.wikipedia.org/wiki/Relational_algebra#Semijoin – soorajmr Jul 01 '15 at 01:24
  • Gotcha. I updated the code slightly. It's similar to your code, to be sure, but I'm trying to avoid the `collect` and `broadcast` pattern you're using because that creates a bottleneck and won't scale if `attributeTable` is large. (Maybe that's not an issue if, as you say, it will only ever be 1000's of rows. In any case, it still seems just a bit more straightforward.) – santon Jul 01 '15 at 18:07
  • That looks reasonable. I hadn't noticed the keyBy API. I haven't run your code though, and I cannot comment on the efficiency. – soorajmr Jul 02 '15 at 10:09
  • Do you mind adding a line to your answer saying (like ABC pointed out) data frames are probably better suited for this kind of thing, so I could mark your answer as accepted? That will then be helpful for people who land on this page in future looking for a solution. – soorajmr Jul 02 '15 at 10:12
  • Sure. I just modified it. If I get around to it then maybe I can put the DataFrame solution as well :-) – santon Jul 02 '15 at 17:09