0

I have the following datasets:

User Table

+-------+---------+
|user_id|    value|
+-------+---------+
|  user1|[1, 2, 3]|
|  user2|[4, 5, 6]|
|  user3|[7, 8, 9]|
+-------+---------+

Items Table

+---------------+---------------+---------------+
|          item1|          item2|          item3|
+---------------+---------------+---------------+
|[0.5, 0.6, 0.7]|[0.2, 0.3, 0.4]|[0.1, 0.8, 0.9]|
+---------------+---------------+---------------+

I want to generate the following DS by multiplying UsersXItems

+-------+-----+-----+-----+
|user_id|item1|item2|item3|
+-------+-----+-----+-----+
|  user1|  3.8|    2|  4.4|
|  user2|  9.2|  4.7|  9.8|
|  user3| 14.6|  7.4| 15.2|
+-------+-----+-----+-----+

I was initially thinking on a cross join to get all fields together and then doing the multiplication row by row and column by column but that seems like the wrong approach (and a very slow and memory-intensive process).

Is there a better approach I should use?

I'm using Scala and Spark 3.1

David R
  • 11
  • 4

2 Answers2

3

I don't know if you can avoid the outer join but for the dot calculation, you can use array_zip and aggregate.

Pyspark 3.1+

from pyspark.sql import functions as F

cols = ['item1', 'item2', 'item3']

df = (df_user.join(df_item, how='outer')
      .select('user_id', 
              *[F.arrays_zip(F.col('value'), c).alias(c) for c in cols])
      .select('user_id',
              *[F.aggregate(c, F.lit(0.0), lambda acc, x: acc + x['value'] * x[c]).alias(c) for c in cols]))

Note this can have a floating point issue (Python weird addition bug).

Emma
  • 8,518
  • 1
  • 18
  • 35
1

I was able to solve it using Emma's example but adapting it for Scala and PySpark.

Given the inputs:

  val items = Seq(
    (Array(0.5, 0.6, 0.7), Array(0.2, 0.3, 0.4), Array(0.1, 0.8, 0.9))
  ).toDF("item1", "item2", "item3")

  val users = Seq(
    ("user1", Array(1, 2, 3)),
    ("user2", Array(4, 5, 6)),
    ("user3", Array(7, 8, 9))
  ).toDF("user_id", "value")

  val cols = Seq("item1", "item2", "item3")

I calculated the expected output as follows:

users.crossJoin(items).select(col("user_id") +: cols.map(c => {
    expr(s"aggregate(zip_with(value, `${c}`, (x, y) -> x * y), 0D, (s, x) -> s + x)").as(c)
  }): _*).show()

This outputs:

+-------+-----+-----+-----+
|user_id|item1|item2|item3|
+-------+-----+-----+-----+
|user1  |3.8  |2.0  |4.4  |
|user2  |9.2  |4.7  |9.8  |
|user3  |14.6 |7.4  |15.2 |
+-------+-----+-----+-----+

I have yet to determine the performance of this solution but at least 'it works' for now.

David R
  • 11
  • 4