2

I am obtaining an unexpected error when trying to cumulate a value over a column-dependent number of previous rows using a window-function in pyspark.

A Minimal Working Example (MWE) to reproduce the error I am experiencing is the following:

from pyspark.sql import Window
import pandas as pd

df = sqlContext.createDataFrame( [("A", 0.1, pd.datetime(2020,12,1), 0),
                                  ("A", 2.1, pd.datetime(2020,12,5), 3), 
                                  ("A", 1.1, pd.datetime(2020,12,7), 1), 
                                  ("A", 3.1, pd.datetime(2020,12,9), 3), 
                                 ],
                                 ["id", "value","timestamp", "previous_rows_to_consider"] )
df.show()
# +---+-----+-------------------+-------------------------+
# | id|value|          timestamp|previous_rows_to_consider|
# +---+-----+-------------------+-------------------------+
# |  A|  0.1|2020-12-01 00:00:00|                        0|
# |  A|  2.1|2020-12-05 00:00:00|                        3|
# |  A|  1.1|2020-12-07 00:00:00|                        1|
# |  A|  3.1|2020-12-09 00:00:00|                        3|
# +---+-----+-------------------+-------------------------+

import pyspark.sql.functions as F
w = Window.partitionBy('id').orderBy( F.col('timestamp') ).rowsBetween( -F.col('previous_rows_to_consider'),0 )

df = df.withColumn('value_cumsum_on_previous_rows', F.sum('value').over(w) )
df.show()

which yields ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions..

I also tried some workaround resorting to rangeBetween instead of rowsBetween but I get the same error.

As pointed out by @murtihash in a related question, I do have the doubt that rowsBetween/rangeBetween does not accept column type as input at all, but on some online resources I found that at least rangeBetween should (see for instance the overview provided here).

  • Does anyone can either understand what is blocking my MWE or confirm that rangeBetween and rowsBetween only accept integer value as inputs?
  • If the latter is the case, can anyone suggest a workaround to compute the cumulative sum over a column-dependent range/number of rows?
mck
  • 40,932
  • 13
  • 35
  • 50
nicolaphee
  • 43
  • 1
  • 7

1 Answers1

1

Indeed you can't put columns into range/row windows. As a workaround (inspired by How to calculate rolling sum with varying window sizes in PySpark):

df2 = df.withColumn(
    'value_list', 
    F.reverse(
        F.collect_list('value').over(Window.partitionBy('id').orderBy('timestamp'))
    )
).withColumn(
    'cumsum',
    F.expr('''aggregate(
        slice(value_list, 1, previous_rows_to_consider + 1),
        cast(0 as double),
        (x, y) -> x + y
    )''')
)

df2.show()
+---+-----+-------------------+-------------------------+--------------------+------+
| id|value|          timestamp|previous_rows_to_consider|          value_list|cumsum|
+---+-----+-------------------+-------------------------+--------------------+------+
|  A|  0.1|2020-12-01 00:00:00|                        0|               [0.1]|   0.1|
|  A|  2.1|2020-12-05 00:00:00|                        3|          [2.1, 0.1]|   2.2|
|  A|  1.1|2020-12-07 00:00:00|                        1|     [1.1, 2.1, 0.1]|   3.2|
|  A|  3.1|2020-12-09 00:00:00|                        3|[3.1, 1.1, 2.1, 0.1]|   6.4|
+---+-----+-------------------+-------------------------+--------------------+------+

Note that this solution works for windows with rows preceding. If you need rows following, remove F.reverse.

mck
  • 40,932
  • 13
  • 35
  • 50
  • I was trying to break down the input of `F.expr` by converting it to "pure pyspark" to better understand the single steps. I managed to understand most of what that command does. However, I couldn't convert that step to "pure pyspark" beacuse (1) following [this answer](https://stackoverflow.com/questions/57758729/how-to-dynamically-slice-an-array-column-in-spark) `slice` accept column inputs only using `F.expr`; (2) I can't find a "pure-pyspark" way to sum up the elements of an array column which does not resort to `F.expr('aggregate(...)')`. – nicolaphee Dec 16 '20 at 14:07
  • Pyspark does not support all Spark SQL functions, and when it does, sometimes it doesn't support using columns as arguments. Unfortunately that's a limitation of pyspark, but fortunately we can always use F.expr – mck Dec 16 '20 at 14:08
  • 1
    Also, since I I guess it might be useful to somebody who is not familiar with `F.expr` like me, let me add my understanding of its usage here: (1) `slice` takes only the first _previous_rows_to_consider_ elements of _value_list_, (2) `aggregate`, with the `(x, y) -> x + y` third argument sums up the elements of the resulting column, (3) I guess the `cast` either specifies the type of the column resulting from the aggregate function or how to deal with 0/null values). Please, let me know if I am mistaken. – nicolaphee Dec 16 '20 at 14:12
  • I think you're right. try it without the cast and see what error you get ;) that will tell you what the cast is for – mck Dec 16 '20 at 14:13
  • Well, I tried but I couldn't really figure that out. It looks like the value inside the `cast` works like a sort of offset (if I change the 0 to a scalar value _x_, columns _cumsum_ turns out to be shifted of _x_). Anyway, everything is fine to me at this point. Thanks a lot [@mck](https://stackoverflow.com/users/14165730/mck)! – nicolaphee Dec 16 '20 at 16:17
  • @nicolaphee yes, that's the initialization value for the aggregate function. it has to be of the same type as the values to be summed (i.e. values in the array), so it needs to be cast to double. – mck Dec 16 '20 at 16:19