I have a Spark job that needs to compute movie content-based similarities. There are 46k movies. Each movie is represented by a set of SparseVectors (each vector is a feature vector for one of the movie's fields such as Title, Plot, Genres, Actors, etc.). For Actors and Genres, for example, the vector shows whether a given actor is present (1) or absent (0) in the movie.
The task is to find top 10 similar movies for each movie. I managed to write a script in Scala that performs all those computations and does the job. It works for smaller sets of movies such as 1000 movies but not for the whole dataset (out of memory, etc.).
The way I do this computation is by using a cross join on the movies dataset. Then reduce the problem by only taking rows where movie1_id < movie2_id. Still the dataset at this point will contain 46000^2/2 rows which is 1058000000. And each row has significant amount of data.
Then I calculate similarity score for each row. After similarity is calculated I group the results where movie1_id is same and sort them in descending order by similarity score using a Window function taking top N items (similar to how it's described here: Spark get top N highest score results for each (item1, item2, score)).
The question is - can it be done more efficiently in Spark? E.g. without having to perform a crossJoin?
And another question - how does Spark deal with such huge Dataframes (1058000000 rows consisting of multiple SparseVectors)? Does it have to keep all this in memory at a time? Or does it process such dataframes piece by piece somehow?
I'm using the following function to calculate similarity between movie vectors:
def intersectionCosine(movie1Vec: SparseVector, movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)
var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
val index: Int = a.indexAt(offset)
val value: Double = a.valueAt(offset)
dot += value * b(index)
offset += 1
}
val bReduced: BSV[Double] = new BSV(a.index, a.index.map(i => b(i)), a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)
if (maga == 0 || magb == 0)
return 0
else
return dot / (maga * magb)
}
Each row in the Dataframe consists of two joined classes:
final case class MovieVecData(imdbID: Int,
Title: SparseVector,
Decade: SparseVector,
Plot: SparseVector,
Genres: SparseVector,
Actors: SparseVector,
Countries: SparseVector,
Writers: SparseVector,
Directors: SparseVector,
Productions: SparseVector,
Rating: Double
)