First of all I just start to learn scala so if the question that I am asking so easy sorry for that.
I have a data of ratings which is val ratings: RDD[Rating](userid, movieid, rating)
I want to calculate every user's average and creating a RDD (userid, average_rating)
.
After that I will filter the ratings data according to the average rating of the user like if a user average rating is 2.0 then O will take only the rows which ratings is greater and equal to their average rating.
Here what I have tried so far:
val ratings= DATA.filter(row => row!= first_header).map{ fields =>
new Rating(
fields.split(",")(0).toInt,
fields.split(",")(1).toInt,
fields.split(",")(2).toDouble)
}
// Calculating User Average
val counts = ratings.map(item => (item.user,item.rating) )
val goodratingsum = counts.mapValues(value => (value, 1)) // map entry with a count of 1
.reduceByKey {
case ((sumL, countL), (sumR, countR)) =>
(sumL + sumR, countL + countR)
}
val goodratings = goodratingsum.mapValues {
case (sum , count) => sum / count
}
.collect
// Trying to create a new RDD which is filtered according to each user average of ratings.
val goodRatings = ratings.filter(r => r.user == avguserrat._1 && ((r.rating : Double) >= avguserrat._2))
Error: But when I tried to reach the userid and average rating from reduced data avguserrat:
-value _1 is not a member of org.apache.spark.rdd.RDD[(Int, Double)]
-value _2 is not a member of org.apache.spark.rdd.RDD[(Int, Double)]
Why I cannot reach the values of userid and their average ratings.