As I see it there are 2 possible solutions for this matter:
- With a reduceByKey
- With a mapPartitions
Let's see both of them with an example.
I have a dataset of 100.000 movie ratings with the format (idUser, (idMovie, rating)). Let's say we would like to know how many different users have rated a movie:
Lets first take a look using distinct:
val numUsers = rddSplitted.keys.distinct()
println(s"numUsers is ${numUsers.count()}")
println("*******toDebugString of rddSplitted.keys.distinct*******")
println(numUsers.toDebugString)
We will get the following results:
numUsers is 943
*******toDebugString of rddSplitted.keys.distinct*******
(2) MapPartitionsRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:98 []
| ShuffledRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:98 []
+-(2) MapPartitionsRDD[4] at distinct at MovieSimilaritiesRicImproved.scala:98 []
| MapPartitionsRDD[3] at keys at MovieSimilaritiesRicImproved.scala:98 []
| MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
| C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
| C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []
With the toDebugString function, we can analyze in a better way what is happening with our RDD's.
Now, let's use reduceByKey, for instance, counting how many times each user has voted and at the same time obtaining the number of different users:
val numUsers2 = rddSplitted.map(x => (x._1, 1)).reduceByKey({case (a, b) => a })
println(s"numUsers is ${numUsers2.count()}")
println("*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******")
println(numUsers2.toDebugString)
We will get now these results:
numUsers is 943
*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******
(2) ShuffledRDD[4] at reduceByKey at MovieSimilaritiesRicImproved.scala:104 []
+-(2) MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:104 []
| MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
| C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
| C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []
Analyzing the RDD's produced, we can see that reduceByKey performs the same action in a more efficient way than the distinct before.
Finally, let's use mapPartitions. The main goal is to try to distinct first the users in each partition of our dataset, and then obtain the final different users.
val a1 = rddSplitted.map(x => (x._1))
println(s"Number of elements in a1: ${a1.count}")
val a2 = a1.mapPartitions(x => x.toList.distinct.toIterator)
println(s"Number of elements in a2: ${a2.count}")
val a3 = a2.distinct()
println("There are "+ a3.count()+" different users")
println("*******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct *******")
println(a3.toDebugString)
We will get the following:
Number of elements in a1: 100000
Number of elements in a2: 1709
There are 943 different users
*******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct *******
(2) MapPartitionsRDD[7] at distinct at MovieSimilaritiesRicImproved.scala:124 []
| ShuffledRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:124 []
+-(2) MapPartitionsRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:124 []
| MapPartitionsRDD[4] at mapPartitions at MovieSimilaritiesRicImproved.scala:122 []
| MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:120 []
| MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
| C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
| C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []
We can see now that mapPartition first gets the distinct number of user in each partition of the dataset, shorting the number of instances from 100,000 to 1,709 without performing any shuffle. Then, with this much lower amount of data, a distinct over the whole RDD can be carried out without worrying for the shuffle and getting the result much faster.
I would recommend using this last proposal with mapPartitions rather than the reduceByKey, as it manages a lower amount of data. Another solution could be using both functions, first mapPartitions as mentioned before and then instead of distinct, using the reduceByKey in the same way as also mentioned before.