1

We can find the rolling/moving average of a time series data using window function in pyspark.

The data I am dealing with doesn't have any timestamp column but it does have a strictly increasing column frame_number. Data looks like this.

d = [
    {'session_id': 1, 'frame_number': 1, 'rtd': 11.0, 'rtd2': 11.0,}, 
    {'session_id': 1, 'frame_number': 2, 'rtd': 12.0, 'rtd2': 6.0}, 
    {'session_id': 1, 'frame_number': 3, 'rtd': 4.0, 'rtd2': 233.0}, 
    {'session_id': 1, 'frame_number': 4, 'rtd': 110.0, 'rtd2': 111.0,}, 
    {'session_id': 1, 'frame_number': 5, 'rtd': 13.0, 'rtd2': 6.0}, 
    {'session_id': 1, 'frame_number': 6, 'rtd': 43.0, 'rtd2': 233.0}, 
    {'session_id': 1, 'frame_number': 7, 'rtd': 11.0, 'rtd2': 111.0,}]

df = spark.createDataFrame(d)

+------------+-----+-----+----------+
|frame_number|  rtd| rtd2|session_id|
+------------+-----+-----+----------+
|           1| 11.0| 11.0|         1| 
|           2| 12.0|  6.0|         1|
|           3|  4.0|233.0|         1|
|           4|110.0|111.0|         1|
|           5| 13.0|  6.0|         1|
|           6| 43.0|233.0|         1|
|           7| 11.0|111.0|         1|
+------------+-----+-----+----------+

I want to find the rolling average of the column rtd on the strictly increasing column frame_number.

I am trying something like this (using collect_list).

window_size=2
w = Window.partitionBy("session_id").orderBy("frame_number").rowsBetween(0, window_size)
df_lists = df.withColumn('rtd_list', F.collect_list('rtd').over(w))

+------------+-----+-----+----------+-------------------+
|frame_number|  rtd| rtd2|session_id|           rtd_list|
+------------+-----+-----+----------+-------------------+
|           1| 11.0| 11.0|         1|  [11.0, 12.0, 4.0]|
|           2| 12.0|  6.0|         1| [12.0, 4.0, 110.0]|
|           3|  4.0|233.0|         1| [4.0, 110.0, 13.0]|
|           4|110.0|111.0|         1|[110.0, 13.0, 43.0]|
|           5| 13.0|  6.0|         1| [13.0, 43.0, 11.0]|
|           6| 43.0|233.0|         1|       [43.0, 11.0]|
|           7| 11.0|111.0|         1|             [11.0]|
+------------+-----+-----+----------+-------------------+

And then applying a UDF to get moving average.

windudf = F.udf( lambda v: str(np.nanmean(v)), StringType())
out = df_lists.withColumn("moving_average", windudf("rtd_list"))
+------------+-----+-----+----------+-------------------+------------------+
|frame_number|  rtd| rtd2|session_id|           rtd_list|    moving_average|
+------------+-----+-----+----------+-------------------+------------------+
|           1| 11.0| 11.0|         1|  [11.0, 12.0, 4.0]|               9.0|
|           2| 12.0|  6.0|         1| [12.0, 4.0, 110.0]|              42.0|
|           3|  4.0|233.0|         1| [4.0, 110.0, 13.0]|42.333333333333336|
|           4|110.0|111.0|         1|[110.0, 13.0, 43.0]|55.333333333333336|
|           5| 13.0|  6.0|         1| [13.0, 43.0, 11.0]|22.333333333333332|
|           6| 43.0|233.0|         1|       [43.0, 11.0]|              27.0|
|           7| 11.0|111.0|         1|             [11.0]|              11.0|
+------------+-----+-----+----------+-------------------+------------------+    

Issue with above method is that it cannot define a slide duration for a window. Above method calculates moving average for evrey frame. I wnt to move my window by some amount before finding the average. Any ways to achieve this?

Community
  • 1
  • 1
Prince Bhatti
  • 4,671
  • 4
  • 18
  • 24
  • Generate data using this `d = [{'session_id': 1, 'frame_number': 1, 'rtd': 11.0, 'rtd2': 11.0,}, {'session_id': 1, 'frame_number': 2, 'rtd': 12.0, 'rtd2': 6.0}, {'session_id': 1, 'frame_number': 3, 'rtd': 4.0, 'rtd2': 233.0}, {'session_id': 1, 'frame_number': 4, 'rtd': 110.0, 'rtd2': 111.0,}, {'session_id': 1, 'frame_number': 5, 'rtd': 13.0, 'rtd2': 6.0}, {'session_id': 1, 'frame_number': 6, 'rtd': 43.0, 'rtd2': 233.0}, {'session_id': 1, 'frame_number': 7, 'rtd': 11.0, 'rtd2': 111.0,}] df = spark.createDataFrame(d)` – Prince Bhatti Apr 13 '18 at 01:15
  • can you clarify this phrase `I wnt to move my window by some amount before finding the average`? – Ramesh Maharjan Apr 13 '18 at 01:25
  • Let say if `window_size` is `10`. Then average should be calculated at every `slide` of `5`. i.e, If average is the mean of next 10 frames, then, calculate `average` at frame 0, frame 5, frame 10..so on. – Prince Bhatti Apr 13 '18 at 01:29

1 Answers1

4

Define window:

from pyspark.sql import functions as F

w = F.window(
    F.col("frame_number").cast("timestamp"),
    # Just example
    windowDuration="10 seconds",
    slideDuration="5 seconds",
).alias("window")

(df
    .groupBy(w, F.col("session_id"))
    .avg("rtd", "rtd2")
    .withColumn("window", F.col("window").cast("struct<start:long,end:long>"))
    .orderBy("window.start")
    .show())

# +------+----------+------------------+------------------+       
# |window|session_id|          avg(rtd)|         avg(rtd2)|
# +------+----------+------------------+------------------+
# |[-5,5]|         1|             34.25|             90.25|
# |[0,10]|         1|29.142857142857142|101.57142857142857|
# |[5,15]|         1|22.333333333333332|116.66666666666667|
# +------+----------+------------------+------------------+

Also please don't use collect_list with udf to compute average. It give no benefits and has severe performance implications.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115