1

Can I compute a discounted future cumulative sum using spark sql? Below is an example that computes the undiscounted cum future sum using window functions, and I hard coded in what I mean by the discounted cum sum:

from pyspark.sql.window import Window


def undiscountedCummulativeFutureReward(df):
    windowSpec = Window \
        .partitionBy('user') \
        .orderBy('time') \
        .rangeBetween(0, Window.unboundedFollowing)

    tot_reward = F.sum('reward').over(windowSpec)

    df_tot_reward = df.withColumn('undiscounted', tot_reward)
    return df_tot_reward


def makeData(spark, gamma=0.5):
    data = [{'user': 'bob', 'time': 3, 'reward': 10, 'discounted_cum': 10 + (gamma * 9) + ((gamma ** 2) * 11)},
            {'user': 'bob', 'time': 4, 'reward': 9, 'discounted_cum': 9 + gamma * 11},
            {'user': 'bob', 'time': 5, 'reward': 11, 'discounted_cum': 11.0},
            {'user': 'jo', 'time': 4, 'reward': 6, 'discounted_cum': 6 + gamma * 7},
            {'user': 'jo', 'time': 5, 'reward': 7, 'discounted_cum': 7.0},
            ]
    schema = T.StructType([T.StructField('user', T.StringType(), False),
                           T.StructField('time', T.IntegerType(), False),
                           T.StructField('reward', T.IntegerType(), False),
                           T.StructField('discounted_cum', T.FloatType(), False)])

    return spark.createDataFrame(data=data, schema=schema)


def main(spark):
    df = makeData(spark)
    df = undiscountedCummulativeFutureReward(df)
    df.orderBy('user', 'time').show()
    return df

When you run it you get:

+----+----+------+--------------+------------+
|user|time|reward|discounted_cum|undiscounted|
+----+----+------+--------------+------------+
| bob|   3|    10|         17.25|          30|
| bob|   4|     9|          14.5|          20|
| bob|   5|    11|          11.0|          11|
|  jo|   4|     6|           9.5|          13|
|  jo|   5|     7|           7.0|           7|
+----+----+------+--------------+------------+

That is discounted is sum \gamma^k r_k for k=0 to \infinity

I'm wondering if I can compute the discounted column with Window functions, like introduce a column with the rank, a literal with gamma, multiply things together - but still not quite clear - I suppose I can do it with some kind of UDF, but I think I'd have to first collect_as_list all the users, return a new list with the cum discounted sum, and then explode the list.

MrCartoonology
  • 1,997
  • 4
  • 22
  • 38

1 Answers1

1

Suppose you were starting with the following DataFrame:

df.show()
#+----+----+------+
#|user|time|reward|
#+----+----+------+
#| bob|   3|    10|
#| bob|   4|     9|
#| bob|   5|    11|
#|  jo|   4|     6|
#|  jo|   5|     7|
#+----+----+------+

You can join this DataFrame to itself on the user column, and keep only those rows where the time column of the right table is greater than or equal to the time column of the left table. We make this easier by aliasing the DataFrames l and r.

After the join, you can group by user, time and reward from the left table and aggregate the reward column from the right table. However it seems that a groupBy followed by an orderBy is not guaranteed to maintain that order, so you should use a Window to be explicit.

from pyspark.sql import Window, functions as f

w = Window.partitionBy("user", "l.time", "l.reward").orderBy("r.time")

df = df.alias("l").join(df.alias("r"), on="user")\
    .where("r.time>=l.time")\
    .select(
        "user",
        f.col("l.time").alias("time"),
        f.col("l.reward").alias("reward"),
        f.collect_list("r.reward").over(w).alias("rewards")
    )

df.show()
#+----+----+------+-----------+
#|user|time|reward|    rewards|
#+----+----+------+-----------+
#|  jo|   4|     6|        [6]|
#|  jo|   4|     6|     [6, 7]|
#|  jo|   5|     7|        [7]|
#| bob|   3|    10|       [10]|
#| bob|   3|    10|    [10, 9]|
#| bob|   3|    10|[10, 9, 11]|
#| bob|   4|     9|        [9]|
#| bob|   4|     9|    [9, 11]|
#| bob|   5|    11|       [11]|
#+----+----+------+-----------+

Now you have all of the elements required to compute your discounted_cum column.

Spark 2.1 and above:

You can use pyspark.sql.functions.posexplode to explode the rewards array along with the index in the list. This will make a new row for each value in the rewards array. Use distinct to drop duplicates that were introduced by using the Window function (instead of groupBy).

We'll call the index k and the reward rk. Now you can apply your function using pyspark.sql.functions.pow

gamma = 0.5

df.select("user", "time", "reward", f.posexplode("rewards").alias("k", "rk"))\
    .distinct()\
    .withColumn("discounted", f.pow(f.lit(gamma), f.col("k"))*f.col("rk"))\
    .groupBy("user", "time")\
    .agg(f.first("reward").alias("reward"), f.sum("discounted").alias("discounted_cum"))\
    .show()
#+----+----+------+--------------+
#|user|time|reward|discounted_cum|
#+----+----+------+--------------+
#| bob|   3|    10|         17.25|
#| bob|   4|     9|          14.5|
#| bob|   5|    11|          11.0|
#|  jo|   4|     6|           9.5|
#|  jo|   5|     7|           7.0|
#+----+----+------+--------------+

Older Versions of Spark

For older versions of spark, you'll have to use row_number()-1 to get the values for k after using explode:

df.select("user", "time", "reward", f.explode("rewards").alias("rk"))\
    .distinct()\
    .withColumn(
        "k",
        f.row_number().over(Window.partitionBy("user", "time").orderBy("time"))-1
    )\
    .withColumn("discounted", f.pow(f.lit(gamma), f.col("k"))*f.col("rk"))\
    .groupBy("user", "time")\
    .agg(f.first("reward").alias("reward"), f.sum("discounted").alias("discounted_cum"))\
    .show()
#+----+----+------+--------------+
#|user|time|reward|discounted_cum|
#+----+----+------+--------------+
#|  jo|   4|     6|           9.5|
#|  jo|   5|     7|           7.0|
#| bob|   3|    10|         17.25|
#| bob|   4|     9|          14.5|
#| bob|   5|    11|          11.0|
#+----+----+------+--------------+
pault
  • 41,343
  • 15
  • 107
  • 149