1

I start with the following table:

|date       | first_cat  | second_cat | price_change|
|:--------- | :--------- |: --------  |  ----------:|
|30/05/2022 | old        | test_2     |         0.94|
|31/08/2022 | old        | test_3     |         1.24|
|30/05/2022 | old        | test_2     |         0.90|
|31/08/2022 | old        | test_3     |         1.44|
|30/05/2022 | new        | test_1     |         1.94|
|30/06/2022 | new        | test_4     |         0.54|
|31/07/2022 | new        | test_5     |         1.94|
|30/06/2022 | new        | test_4     |         0.96|

I want to proceed to calculate the product of price_change grouped by date, first_cat and second_cat:

|date       | first_cat  | second_cat | price_aggr  |
|:--------- | :--------- |: --------  |  ----------:|
|30/05/2022 | old        | test_2     |     0.94*0.9|
|31/08/2022 | old        | test_3     |    1.24*1.44|
|30/05/2022 | new        | test_1     |         1.94|
|30/06/2022 | new        | test_4     |    0.54*0.96|
|31/07/2022 | new        | test_5     |         1.94|

I did it with:

SELECT
    date,
    first_cat,
    second_cat
    array_join(collect_list(price_change), "*") as price_aggr
FROM my_table
GROUP BY
    date,
    first_cat,
    second_cat

However using that results in having text expression in a table, whereas I would like to have evaluation of that expression, so the desired result is as follows:

|date       | first_cat  | second_cat | price_aggr  |
|:--------- | :--------- |: --------  |  ----------:|
|30/05/2022 | old        | test_2     |        0.846|
|31/08/2022 | old        | test_3     |       1.7856|
|30/05/2022 | new        | test_1     |         1.94|
|30/06/2022 | new        | test_4     |       0.5184|
|31/07/2022 | new        | test_5     |         1.94|

I saw some ideas, but they use 'Pandas' and other methods that fall outside Spark SQL: Cumulative product in Spark

I need to do it in Spark SQL alone, I'd like to avoid conversion to Pandas and UDFs.

Many thanks!

Harrv7
  • 15
  • 1
  • 7

1 Answers1

1

You need to aggregate price_change by multiplying all values within groups. With UDF and Dataframe API it's pretty straightforward:

val product = udf { pcs: Seq[Double] => pcs.reduce(_ * _) }
my_table.groupBy($"date", "$first_cat", $"second_cat")
  .agg(product(collect_list($"price_change")).as("price_aggr"))
  .show

You can go with SQL also:

val product = udf { pcs: Seq[Double] => pcs.reduce(_ * _) }
spark.udf.register("product", product)
spark.sql("""
  SELECT date, first_cat, second_cat, product(collect_list(price_change))      
    FROM my_table
   GROUP BY date, first_cat, second_cat
""").show

Well... if you strictly want to avoid UDFs and less care about readability this will work too:

SELECT date, first_cat, second_cat,
       exp(sum(ln(price_change))) as price_aggr      
  FROM my_table
 GROUP BY date, first_cat, second_cat;

It makes use of simple transformation - addition of natural logarithms of two numbers, then expotentiating them is equivalent to multiplication (->reading). It's not super readable and beware of potential precision loss - your choice.

Kombajn zbożowy
  • 8,755
  • 3
  • 28
  • 60
  • Thanks for the reply! When I try to use second option, I get an error: `name 'product' is not defined'`. Do I need to do anything else in SQL apart from `spark.udf.register("product", product)` ? – Harrv7 Sep 27 '22 at 13:33
  • `spark.udf.register` should be enough (I checked with Spark 3.1.2). – Kombajn zbożowy Sep 27 '22 at 13:36
  • It doesn't work. I have spark version of `v3.1.1.3.1.7270.1001-1`. Perhaps it's valid on your end as you define `product` with `val product = udf{}`? – Harrv7 Sep 27 '22 at 14:11
  • Of course you need to both *define* the UDF and *register* it. It might have been not obvious from the answer, I updated it now. – Kombajn zbożowy Sep 27 '22 at 14:28
  • I see, and is there a way to make that declaration work in Jupyter Notebook having `pyspark3` kernel? That definition is not recognized syntax-wise in Python. – Harrv7 Sep 27 '22 at 14:37
  • Probably because my code is Scala :) For Python this should work: `import functools; from pyspark.sql.functions import udf; product = udf(lambda pcs: functools.reduce(lambda a, b: a * b, pcs))` – Kombajn zbożowy Sep 27 '22 at 14:57