I have a dataset with 40K entries, each entry look like the following :
product/productId: B00004CK40 review/userId: A39IIHQF18YGZA review/profileName: C. A. M. Salas review/helpfulness: 0/0 review/score: 4.0 review/time: 1175817600 review/summary: Reliable comedy review/text: Nice script, well acted comedy, and a young Nicolette Sheridan. Cusak is in top form.
I'm using Spark to build a recommendation engine using :
org.apache.spark.mllib.recommendation.ALS;
org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
org.apache.spark.mllib.recommendation.Rating;
I want to know what is the best practice to convert a userId
& productId
String to a unique Integer in distributed mode using spark, The reason I want to convert to Integers is that recommendation.Rating
constructor is as following
Rating(int user, int product, double rating)
In addition to that I'll need to preserve the mapping in order to go back from Integer to the appropriate movieId when I return the Recommended movieIds for certain user. so a client could enter a userId String, and the output would be top10 recommended movieId Strings.
code snippet of building the Rating RDD:
JavaRDD<Rating> ratings = movieData.map(
new Function<String,Rating>() {
@Override
public Rating call(String s) throws Exception {
//getting entry data
int movieIdConverted, userIdConverted;
String[] data = s.split("\t");
String movieId = data[0].split(":")[1].trim();
String userId = data[1].split(":")[1].trim();
String movieScore = data[4].split(":")[1].trim();
if(movieIdsHashMapMirror.containsKey(movieId)) {
movieIdConverted = movieIdsHashMapMirror.get(movieId);
} else {
//saving mappings of movieId
movieIdConverted = movieIdCounter;
movieIdsHashMap.put(movieIdCounter, movieId);
movieIdsHashMapMirror.put(movieId, movieIdCounter);
movieIdCounter++;
}
if(userIdsHashMapMirror.containsKey(userId)) {
userIdConverted = userIdsHashMapMirror.get(userId);
} else {
//saving the mappings of userId
userIdConverted = userIdCounter;
userIdsHashMapMirror.put(userId, userIdCounter);
userIdCounter++;
}
//Rating(user: Int, product(movieId): Int, rating: Double)
Rating rating = new Rating(userIdConverted, movieIdConverted, Double.parseDouble(movieScore));
return rating;
}
}
);
The DS used to preserve the data:
// saving userId & movieId mappings
private static HashMap<Integer, String> movieIdsHashMap = new HashMap<Integer, String>();
// saving userId & movieId mappings
private static HashMap<String, Integer> movieIdsHashMapMirror = new HashMap<String, Integer>();
private static HashMap<String, Integer> userIdsHashMapMirror = new HashMap<String, Integer>();
private static int movieIdCounter = 0;
private static int userIdCounter = 0;
Collaborative Filtering : http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html