3

I have a spark dataframe with an array column that looks like this:

+--------------+
|            x |
+--------------+
| [1, 1, 0, 1] |
| [0, 0, 0, 0] |
| [0, 0, 1, 1] |
| [0, 0, 0, 1] |
|    [1, 0, 1] |
+--------------+

I want to add a new column with another array that contains the cumulative sum of x at each index. The result should look like this:

+--------------+---------------+
|            x | x_running_sum |
+--------------+---------------+
| [1, 1, 0, 1] |  [1, 2, 2, 3] |
| [0, 0, 0, 0] |  [0, 0, 0, 0] |
| [0, 0, 1, 1] |  [0, 0, 1, 2] |
| [0, 0, 0, 1] |  [0, 0, 0, 1] |
|    [1, 0, 1] |     [1, 1, 2] |
+--------------+---------------+

How can I create the x_running_sum column? I've tried using some of the higher order functions like transform, aggregate, and zip_with, but I haven't found a solution yet.

1 Answers1

4

To perform a cumulative sum I sliced the array by index position and reduce the values from it:

from pyspark.sql import Row


df = spark.createDataFrame([
  Row(x=[1, 1, 0, 1]),
  Row(x=[0, 0, 0, 0]),
  Row(x=[0, 0, 1, 1]),
  Row(x=[0, 0, 0, 1]),
  Row(x=[1, 0, 1])
])

(df
 .selectExpr('x', "TRANSFORM(sequence(1, size(x)), index -> REDUCE(slice(x, 1, index), CAST(0 as BIGINT), (acc, el) -> acc + el)) AS x_running_sum")
 .show(truncate=False))

Output

+------------+-------------+
|x           |x_running_sum|
+------------+-------------+
|[1, 1, 0, 1]|[1, 2, 2, 3] |
|[0, 0, 0, 0]|[0, 0, 0, 0] |
|[0, 0, 1, 1]|[0, 0, 1, 2] |
|[0, 0, 0, 1]|[0, 0, 0, 1] |
|[1, 0, 1]   |[1, 1, 2]    |
+------------+-------------+
Kafels
  • 3,864
  • 1
  • 15
  • 32
  • 1
    Awesome, thank you! I had to change `REDUCE` to `AGGREGATE` (not sure if REDUCE was introduced in a later pyspark version -- I'm on 2.4.5). I also had to change `index` to `i` (I think index is a reserved SQL keyword). – djcunningham0 Jun 02 '21 at 22:28
  • @Kafels would be great if you could add the pyspark version you are using here – pythonic833 Jun 03 '21 at 11:52
  • Spark version: 3.1.1 – Kafels Jun 03 '21 at 16:47
  • If I see correctly, this algorithm has squared complexity wrt the length of the array. This running sum can be implemented in linear time quite easily with for loops and creating multiple simple columns. I'm not yet able to do this nicely on arrays. Do you have an idea/suggestion? – Quickbeam2k1 Dec 16 '21 at 09:06
  • @Quickbeam2k1 you may get the same result using `rdd` as I did in this [post](https://stackoverflow.com/questions/47364686/pyspark-cumulative-sum-with-reset-condition/64957835#64957835) but what is going to perform faster I couldn't say to you – Kafels Dec 26 '21 at 22:35