I am attempting to build an online recommender system using the Spark recommendation ALS algorithm. My data resides in MongoDB, where I keep collections of users, items and ratings. The identifiers for these documents are of the default type ObjectID
. I am looking for an efficient way to map these ObjectID
types to the required int
for ALS. Concretely, my ratings collection consists of entries of the structure {user: ObjectID, item: ObjectID, rating: float}
.
My recommender system will be getting new ratings fed to it regularly, which requires new ALS models to be computed with batches of new ratings coming in. Therefore I do not plan to save the models, and consider it the easiest implementation to get the new ratings from MongoDB based on their timestamp and that of the last trained model. New ratings are then processed in Spark to get int
IDs assigned, so I'm looking for the most efficient implementation. Below I elaborate on my attempt, any feedback on how to improve my approach will be greatly appreciated.
My attempt
As per the answer to this question, I have tried to implement strategies using RDD.zipWithUniqueId()
and RDD.zipWithIndex()
. My procedure is as follows, replacing zipWithIndex()
for zipWithUniqueId()
for the second variation:
# Retrieve from MongoDB (using pymongo_spark)
ratings_mongo = sc.mongoRDD(mongo_path)
ratings = ratings_mongo.map(lambda r: (r['user'], r['item'], r['rating']))
# Get distinct users and items
users = ratings.map(lambda r: r[0]).distinct()
items = ratings.map(lambda r: r[1]).distinct()
# Zips the RDDs and creates 'mirrored' RDDs to facilitate reverse mapping
user_int = users.zipWithIndex()
int_user = user_int.map(lambda u: (u[1], u[0]))
item_int = items.zipWithIndex()
int_item = item_int.map(lambda i: (i[1], i[0]))
# Substitutes the ObjectIDs in the ratings RDD with the corresponding int values
ratings = ratings.map(lambda r: (r[0], (r[1], r[2]))).join(user_int).map(lambda r: (r[1][1], r[1][0][0], r[1][0][1]))
ratings = ratings.map(lambda r: (r[1], (r[0], r[2]))).join(item_int).map(lambda r: (r[1][0][0], r[1][1], r[1][0][1]))
I am relatively new to the game, and I feel that there may be a more efficient way to go about this. Also, the zipWithIndex()
does not work on my full dataset, as its progress stalls, but does not give an immediate error. It does seem to work on smaller samples.
Otherwise, the zipWithUniqueId()
does work, but seems horrendously slow to complete.
Alternative 1
Use Python's hash()
:
def hash_ids(rating):
return hash(rating[0]) & 0xffffffff, hash(rating[1]) & 0xffffffff, rating[2]
ratings = ratings.map(hash_ids)
Very fast to execute, but, with my limited understanding of hashing, this will ultimately result in collisions. Also, the collisions will always be the same users and items, therefore some user's recommendations will always be someone else's. Am I right?
Alternative 2
Do the conversion outside of Spark, and possibly maintain a unique ID field of type int
in the MongoDB documents.