9

I have a Spark job that needs to compute movie content-based similarities. There are 46k movies. Each movie is represented by a set of SparseVectors (each vector is a feature vector for one of the movie's fields such as Title, Plot, Genres, Actors, etc.). For Actors and Genres, for example, the vector shows whether a given actor is present (1) or absent (0) in the movie.

The task is to find top 10 similar movies for each movie. I managed to write a script in Scala that performs all those computations and does the job. It works for smaller sets of movies such as 1000 movies but not for the whole dataset (out of memory, etc.).

The way I do this computation is by using a cross join on the movies dataset. Then reduce the problem by only taking rows where movie1_id < movie2_id. Still the dataset at this point will contain 46000^2/2 rows which is 1058000000. And each row has significant amount of data.

Then I calculate similarity score for each row. After similarity is calculated I group the results where movie1_id is same and sort them in descending order by similarity score using a Window function taking top N items (similar to how it's described here: Spark get top N highest score results for each (item1, item2, score)).

The question is - can it be done more efficiently in Spark? E.g. without having to perform a crossJoin?

And another question - how does Spark deal with such huge Dataframes (1058000000 rows consisting of multiple SparseVectors)? Does it have to keep all this in memory at a time? Or does it process such dataframes piece by piece somehow?


I'm using the following function to calculate similarity between movie vectors:

def intersectionCosine(movie1Vec: SparseVector, movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)

var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
  val index: Int = a.indexAt(offset)
  val value: Double = a.valueAt(offset)

  dot += value * b(index)
  offset += 1
}

val bReduced: BSV[Double] = new BSV(a.index, a.index.map(i => b(i)), a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)

if (maga == 0 || magb == 0)
  return 0
else
  return dot / (maga * magb)
}

Each row in the Dataframe consists of two joined classes:

final case class MovieVecData(imdbID: Int,
                          Title: SparseVector,
                          Decade: SparseVector,
                          Plot: SparseVector,
                          Genres: SparseVector,
                          Actors: SparseVector,
                          Countries: SparseVector,
                          Writers: SparseVector,
                          Directors: SparseVector,
                          Productions: SparseVector,
                          Rating: Double
                         )
  • You could reuse breeze cosine similarity instead of making your own.. https://github.com/scalanlp/breeze/blob/master/math/src/main/scala/breeze/linalg/functions/cosineDistance.scala – Michel Lemay May 08 '18 at 13:21
  • Can you not see the difference? My similarity is quite different from the standard cosine one – Daniil Andreyevich Baunov May 08 '18 at 21:28
  • This is an interesting choice of similarity. What motivated it? That said, you could have that one liner for the same result: `1 - cosineDistance(a, new BSV(a.index, a.index.map(b(_)), a.length))` – Michel Lemay May 09 '18 at 11:55
  • True, I could do that. Thanks. This similarity measure only considers those indexes that are present in the first vector. This way, if all items present in the first vector are also present in the second, the similarity will be 1. I found it to be the best measure to calculate similarities between movies (like how much the second movie intersects with the first one). – Daniil Andreyevich Baunov May 09 '18 at 15:48
  • But then your similarity function is not symmetric. Have you considered Jaccard distance? It might be well suited for that kind of use case. – Michel Lemay May 09 '18 at 17:52

5 Answers5

8

It can be done more efficiently, as long as you are fine with approximations, and don't require exact results (or exact number or results).

Similarly to my answer to Efficient string matching in Apache Spark you can use LSH, with:

If feature space is small (or can be reasonably reduced) and each category is relatively small you can also optimize your code by hand:

  • explode feature array to generate #features records from a single record.
  • Self join result by feature, compute distance and filter out candidates (each pair of records will be compared if and only if they share specific categorical feature).
  • Take top records using your current code.

A minimal example would be (consider it to be a pseudocode):

import org.apache.spark.ml.linalg._

// This is oversimplified. In practice don't assume only sparse scenario
val indices = udf((v: SparseVector) => v.indices)

val df = Seq(
  (1L, Vectors.sparse(1024, Array(1, 3, 5), Array(1.0, 1.0, 1.0))),
  (2L, Vectors.sparse(1024, Array(3, 8, 12), Array(1.0, 1.0, 1.0))),
  (3L, Vectors.sparse(1024, Array(3, 5), Array(1.0, 1.0))),
  (4L, Vectors.sparse(1024, Array(11, 21), Array(1.0, 1.0))),
  (5L, Vectors.sparse(1024, Array(21, 32), Array(1.0, 1.0)))
).toDF("id", "features")

val possibleMatches = df
  .withColumn("key", explode(indices($"features")))
  .transform(df => df.alias("left").join(df.alias("right"), Seq("key")))

val closeEnough(threshold: Double) = udf((v1: SparseVector, v2: SparseVector) =>  intersectionCosine(v1, v2) > threshold)

possilbeMatches.filter(closeEnough($"left.features", $"right.features")).select($"left.id", $"right.id").distinct

Note that both solutions are worth the overhead only if hashing / features are selective enough (and optimally sparse). In the example shown above you'd compare only rows inside set {1, 2, 3} and {4, 5}, never between sets.

However in the worst case scenario (M records, N features) we can make N M2 comparisons, instead of M2

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • Can you please explain a bit more on the second approach? Without using LSH? I also updated my question with a bit more details. Would this approach still fit? – Daniil Andreyevich Baunov Apr 29 '18 at 16:13
  • 1
    This is my first time when I see `transform` used (!) Thanks. – Jacek Laskowski May 08 '18 at 07:49
  • hi-Zir or any other do you have any solution for this...Please let me know -> https://stackoverflow.com/questions/52923110/spark-python-how-to-calculate-jaccard-similarity-between-each-line-within-an-rd – Anil Kumar May 13 '19 at 10:03
1

Another thought.. Given that your matrix is relatively small and sparse, it can fit in memory using breeze CSCMatrix[Int].

Then, you can compute co-occurrences using A'B (A.transposed * B) followed by a TopN selection of the LLR (logLikelyhood ratio) of each pairs. Here, since you keep only 10 top items per row, the output matrix will be very sparse as well.

You can lookup the details here:

https://github.com/actionml/universal-recommender

Michel Lemay
  • 2,054
  • 2
  • 17
  • 34
0

You can borrow from the idea of locality sensitive hashing. Here is one approach:

  • Define a set of hash keys based on your matching requirements. You would use these keys to find potential matches. For example, a possible hash key could be based on the movie actor vector.
  • Perform reduce for each key. This will give sets of potential matches. For each of the potential matched set, perform your "exact match". The exact match will produce sets of exact matches.
  • Run Connected Component algorithm to perform set merge to get the sets of all exact matches.

I have implemented something similar using the above approach.

Hope this helps.

Shirish Kumar
  • 1,532
  • 17
  • 23
0

Another possible solution would be to use builtin RowMatrix and brute force columnSimilarity as explained on databricks:

https://databricks.com/blog/2014/10/20/efficient-similarity-algorithm-now-in-spark-twitter.html

https://datascience.stackexchange.com/questions/14862/spark-item-similarity-recommendation

Notes:

  • Keep in mind that you will always have N^2 values in resulting similarity matrix
  • You will have to concatenate your sparse vectors
Michel Lemay
  • 2,054
  • 2
  • 17
  • 34
0

One very important suggestion , that i have used in similar scenarios is if some movie

relation     similarity score
A-> B        8/10
B->C         7/10
C->D         9/10

If 

E-> A       4  //less that some threshold or hyperparameter
Don't calculate similarity for
E-> B
E-> C 
E->D
donald
  • 478
  • 8
  • 19