I am obtaining an unexpected error when trying to cumulate a value over a column-dependent number of previous rows using a window-function in pyspark.
A Minimal Working Example (MWE) to reproduce the error I am experiencing is the following:
from pyspark.sql import Window
import pandas as pd
df = sqlContext.createDataFrame( [("A", 0.1, pd.datetime(2020,12,1), 0),
("A", 2.1, pd.datetime(2020,12,5), 3),
("A", 1.1, pd.datetime(2020,12,7), 1),
("A", 3.1, pd.datetime(2020,12,9), 3),
],
["id", "value","timestamp", "previous_rows_to_consider"] )
df.show()
# +---+-----+-------------------+-------------------------+
# | id|value| timestamp|previous_rows_to_consider|
# +---+-----+-------------------+-------------------------+
# | A| 0.1|2020-12-01 00:00:00| 0|
# | A| 2.1|2020-12-05 00:00:00| 3|
# | A| 1.1|2020-12-07 00:00:00| 1|
# | A| 3.1|2020-12-09 00:00:00| 3|
# +---+-----+-------------------+-------------------------+
import pyspark.sql.functions as F
w = Window.partitionBy('id').orderBy( F.col('timestamp') ).rowsBetween( -F.col('previous_rows_to_consider'),0 )
df = df.withColumn('value_cumsum_on_previous_rows', F.sum('value').over(w) )
df.show()
which yields ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
.
I also tried some workaround resorting to rangeBetween
instead of rowsBetween
but I get the same error.
As pointed out by @murtihash in a related question, I do have the doubt that rowsBetween
/rangeBetween
does not accept column type as input at all, but on some online resources I found that at least rangeBetween
should (see for instance the overview provided here).
- Does anyone can either understand what is blocking my MWE or confirm that
rangeBetween
androwsBetween
only accept integer value as inputs? - If the latter is the case, can anyone suggest a workaround to compute the cumulative sum over a column-dependent range/number of rows?