0

I have a DataFrame of user ratings (from 1 to 5) relative to movies. In order to get the DataFrame where the first column is movie id and the rest columns are the ratings for that movie by each user, I do the following:

val ratingsPerMovieDF = imdbRatingsDF
  .groupBy("imdbId")
  .pivot("userId")
  .max("rating")

Now, here I get a DataFrame where most of the values are null due to the fact that most users have rated only few movies.

I'm interested in calculating similarities between those movies (item-based collaborative filtering).

I was trying to assemble a RowMatrix (for further similarities calculations using mllib) using the rating columns values. However, I don't know how to deal with null values.

The following code where I try to get a Vector for each row:

val assembler = new VectorAssembler()
  .setInputCols(movieRatingsDF.columns.drop("imdbId"))
  .setOutputCol("ratings")

val ratingsDF = assembler.transform(movieRatingsDF).select("imdbId", "ratings")

Gives me an error:

Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.

I could substitute them with 0s using .na.fill(0) but that would produce incorrect correlation results since almost all Vectors would become very similar.

Can anyone suggest what to do in this case? The end goal here is to calculate correlations between rows. I was thinking of using SparseVectors somehow (to ignore null values but I don't know how.

I'm new to Spark and Scala so some of this might make little sense. I'm trying to understand things better.

Community
  • 1
  • 1

1 Answers1

2

I believe you are approaching this in a wrong way. Dealing with nuances of Spark API is secondary to a proper problem definition - what exactly do you mean by correlation in case of sparse data.

Filling data with zeros in case of explicit feedback (rating), is problematic not because all Vectors would become very similar (variation of the metric will be driven by existing ratings, and results can be always rescaled using min-max scaler), but because it introduces information which is not present in the original dataset. There is a significant difference between item which hasn't been rated and item which has the lowest possible rating.

Overall you can approach this problem in two ways:

  • You can compute pairwise similarity using only entries where both items have non-missing values. This should work reasonably well if dataset is reasonably dense. It could be expressed using self-join on the input dataset. With pseudocode:

    imdbRatingsDF.alias("left")
      .join(imdbRatingsDF.alias("right"), Seq("userId"))
      .where($"left.imdbId" =!= $"right.imdbId")
      .groupBy($"left.imdbId", $"right.imdbId")
      .agg(simlarity($"left.rating", $"right.rating"))
    

    where similarity implements required similarity metric.

  • You can impute missing ratings, for example using some measure of central tendency. Using average (Replace missing values with mean - Spark Dataframe) is probably the most natural choice.

    More advanced imputation techniques might provide more reliable results, but likely won't scale very well in a distributed system.

Note

Using SparseVectors is essentially equivalent to na.fill(0).

zero323
  • 322,348
  • 103
  • 959
  • 935