Can I compute a discounted future cumulative sum using spark sql? Below is an example that computes the undiscounted cum future sum using window functions, and I hard coded in what I mean by the discounted cum sum:
from pyspark.sql.window import Window
def undiscountedCummulativeFutureReward(df):
windowSpec = Window \
.partitionBy('user') \
.orderBy('time') \
.rangeBetween(0, Window.unboundedFollowing)
tot_reward = F.sum('reward').over(windowSpec)
df_tot_reward = df.withColumn('undiscounted', tot_reward)
return df_tot_reward
def makeData(spark, gamma=0.5):
data = [{'user': 'bob', 'time': 3, 'reward': 10, 'discounted_cum': 10 + (gamma * 9) + ((gamma ** 2) * 11)},
{'user': 'bob', 'time': 4, 'reward': 9, 'discounted_cum': 9 + gamma * 11},
{'user': 'bob', 'time': 5, 'reward': 11, 'discounted_cum': 11.0},
{'user': 'jo', 'time': 4, 'reward': 6, 'discounted_cum': 6 + gamma * 7},
{'user': 'jo', 'time': 5, 'reward': 7, 'discounted_cum': 7.0},
]
schema = T.StructType([T.StructField('user', T.StringType(), False),
T.StructField('time', T.IntegerType(), False),
T.StructField('reward', T.IntegerType(), False),
T.StructField('discounted_cum', T.FloatType(), False)])
return spark.createDataFrame(data=data, schema=schema)
def main(spark):
df = makeData(spark)
df = undiscountedCummulativeFutureReward(df)
df.orderBy('user', 'time').show()
return df
When you run it you get:
+----+----+------+--------------+------------+
|user|time|reward|discounted_cum|undiscounted|
+----+----+------+--------------+------------+
| bob| 3| 10| 17.25| 30|
| bob| 4| 9| 14.5| 20|
| bob| 5| 11| 11.0| 11|
| jo| 4| 6| 9.5| 13|
| jo| 5| 7| 7.0| 7|
+----+----+------+--------------+------------+
That is discounted is sum \gamma^k r_k for k=0 to \infinity
I'm wondering if I can compute the discounted column with Window functions, like introduce a column with the rank, a literal with gamma, multiply things together - but still not quite clear - I suppose I can do it with some kind of UDF, but I think I'd have to first collect_as_list
all the users, return a new list with the cum discounted sum, and then explode the list.