0

I am writing the movie recommender codes in Pyspark. The Recommendation output from ALS is an array inside the movie_id column and another array inside the rating column. But when I am trying to explode the columns individually into temporary dataframes and then join them using 'user_id' the 'inner' join is resulting in a cartesian product.

user_recs_one = user_recs.where(user_recs.user_id == 1)
user_recs_one.show(truncate=False)

+-------+-------------------------------------------------------+
|user_id|recommendations                                        |
+-------+-------------------------------------------------------+
|1      |[[1085, 6.1223927], [1203, 6.0752907], [745, 5.954721]]|
+-------+-------------------------------------------------------+

user_recs_one
DataFrame[user_id: int, recommendations: array<struct<movie_id:int,rating:float>>]

user_recs_one = user_recs_one.select("user_id", "recommendations.movie_id", "recommendations.rating")
user_recs_one.show(truncate=False)

+-------+-----------------+--------------------------------+
|user_id|movie_id         |rating                          |
+-------+-----------------+--------------------------------+
|1      |[1085, 1203, 745]|[6.1223927, 6.0752907, 5.954721]|
+-------+-----------------+--------------------------------+

user_recs_one
DataFrame[user_id: int, movie_id: array<int>, rating: array<float>]


x = user_recs_one.select("user_id", F.explode(col("movie_id")).alias("movie_id"))
x.show()

+-------+--------+
|user_id|movie_id|
+-------+--------+
|      1|    1085|
|      1|    1203|
|      1|     745|
+-------+--------+

y = user_recs_one.select("user_id", 
F.explode(col("rating")).alias("rating"))
y.show()

+-------+---------+
|user_id|   rating|
+-------+---------+
|      1|6.1223927|
|      1|6.0752907|
|      1| 5.954721|
+-------+---------+

x.join(y, on='user_id', how='inner').show()

+-------+--------+---------+
|user_id|movie_id|   rating|
+-------+--------+---------+
|      1|    1085|6.1223927|
|      1|    1085|6.0752907|
|      1|    1085| 5.954721|
|      1|    1203|6.1223927|
|      1|    1203|6.0752907|
|      1|    1203| 5.954721|
|      1|     745|6.1223927|
|      1|     745|6.0752907|
|      1|     745| 5.954721|
+-------+--------+---------+
  • I think this one should be able to help you: https://stackoverflow.com/questions/41027315/pyspark-split-multiple-array-columns-into-rows – Shaido Mar 29 '18 at 08:16
  • I saw that answer before posting. But I want to understand why the behavior is like this. And moreover x and y are separate dataframes, why their inner join is turning into cartesian join. – Anindya Saha Mar 29 '18 at 08:28
  • That is because your key is repeated on multiple rows. Since there is no ordering when joining it will give you all possible combinations of the values. See this one for example: https://stackoverflow.com/questions/10939090/inner-join-returning-large-numbers-of-duplicates – Shaido Mar 29 '18 at 09:08
  • 1
    Aha! Yes Indeed! I got what you are saying. Thanks much for clarifying. – Anindya Saha Mar 29 '18 at 10:40

1 Answers1

1

Since my result set was very small, this is what I ended up implementing:

user_recs_one = user_recs_one.select("user_id", "recommendations.movie_id", "recommendations.rating")
user_recs_one.show(truncate=False)

+-------+-----------------+--------------------------------+
|user_id|movie_id         |rating                          |
+-------+-----------------+--------------------------------+
|1      |[1085, 1203, 745]|[6.1223927, 6.0752907, 5.954721]|
+-------+-----------------+--------------------------------+

user_recs_one
DataFrame[user_id: int, movie_id: array<int>, rating: array<float>]

Introduce a Sequence Id:

In order to join the recommended movies and recommended ratings we need to introduce an additional id column. In order to ensure that the values in the id column is increasing we use the monotonically_increasing_id() function. This function is guaranteed to produce increasing numbers but not guaranteed to produce sequential increasing numbers if there are more than 1 partition in the dataframe. So we also repartition the exploded dataframe into 1 partition.

only_movies = user_recs_one.select("user_id", F.explode(col("movie_id")).alias("movie_id"))
only_movies = only_movies.repartition(1).withColumn('id', F.monotonically_increasing_id())
only_movies = only_movies.select('id', 'user_id', 'movie_id')
only_movies.show()

+---+-------+--------+
| id|user_id|movie_id|
+---+-------+--------+
|  0|      1|    1085|
|  1|      1|    1203|
|  2|      1|     745|
+---+-------+--------+

only_ratings = user_recs_one.select("user_id", F.explode(col("rating")).alias("rating"))
only_ratings = only_ratings.repartition(1).withColumn('id', F.monotonically_increasing_id())
only_ratings = only_ratings.select('id', 'user_id', 'rating')
only_ratings.show()

+---+-------+---------+
| id|user_id|   rating|
+---+-------+---------+
|  0|      1|6.1223927|
|  1|      1|6.0752907|
|  2|      1| 5.954721|
+---+-------+---------+

only_movies.join(only_ratings.drop('user_id'), on='id', how='inner').drop('id').show()

+-------+--------+---------+
|user_id|movie_id|   rating|
+-------+--------+---------+
|      1|    1085|6.1223927|
|      1|    1203|6.0752907|
|      1|     745| 5.954721|
+-------+--------+---------+