1

I have:

from pyspark.sql import functions as F
from pyspark.sql.window import Window


df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00",'Store 1'),
                            (13, "2017-04-15T12:27:18+00:00",'Store 1'),
                            (25, "2017-05-18T11:27:18+00:00",'Store 1'),
                            (18, "2017-05-19T11:27:18+00:00",'Store 1'),
                            (13, "2017-03-15T12:27:18+00:00",'Store 2'),
                            (25, "2017-05-18T11:27:18+00:00",'Store 2'),
                            (25, "2017-08-18T11:27:18+00:00",'Store 2')],
                        ["dollars", "timestampGMT",'Store'])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
dollars timestampGMT    Store
17  2017-03-10 15:27:18 Store 1
13  2017-04-15 12:27:18 Store 1
25  2017-05-18 11:27:18 Store 1
18  2017-05-19 11:27:18 Store 1
13  2017-03-15 12:27:18 Store 2
25  2017-05-18 11:27:18 Store 2
25  2017-08-18 11:27:18 Store 2

I want to average by the last 3 months (if last 3 months are present, else 0), grouping by Store. Ending up with:

dollars timestampGMT    Store   Last_3_months_Average
17  2017-03-10 15:27:18 Store 1 0
13  2017-04-15 12:27:18 Store 1 0
25  2017-05-18 11:27:18 Store 1 18.25
18  2017-05-19 11:27:18 Store 1 18.25
13  2017-03-15 12:27:18 Store 2 0
25  2017-05-18 11:27:18 Store 2 0
25  2017-08-18 11:27:18 Store 2 0
25  2017-08-19 11:27:18 Store 2 0

Not sure how to approach this problem. Should I group by month first?

  • Does this answer your question? [pyspark: rolling average using timeseries data](https://stackoverflow.com/questions/45806194/pyspark-rolling-average-using-timeseries-data) – Lamanus Aug 16 '20 at 15:40
  • @Lamanus Unfortunately this is not enough. This will do the rolling average, but I can't use that to set as 0 if I don't have data for the last 3 consecutive months. – Victor Hugo Borges Aug 16 '20 at 15:52

1 Answers1

1

Try this.

import pyspark.sql.functions as f
from pyspark.sql import Window

w1 = Window.partitionBy('Store').orderBy('month')
w2 = Window.partitionBy('Store').orderBy('month').rangeBetween(-2, 0)

df2 = df.withColumn('month', (f.year('timestampGMT') - 2000) * 12 + f.month('timestampGMT'))
df2.show(10, False)
+-------+-------------------+-------+-----+
|dollars|timestampGMT       |Store  |month|
+-------+-------------------+-------+-----+
|17     |2017-03-10 15:27:18|Store 1|207  |
|13     |2017-04-15 12:27:18|Store 1|208  |
|25     |2017-05-18 11:27:18|Store 1|209  |
|18     |2017-05-19 11:27:18|Store 1|209  |
|13     |2017-03-15 12:27:18|Store 2|207  |
|25     |2017-05-18 11:27:18|Store 2|209  |
|25     |2017-08-18 11:27:18|Store 2|212  |
+-------+-------------------+-------+-----+

df3 = df2.select('Store', 'month').distinct() \
        .withColumn('lag', f.lag('month', 2, 0).over(w1)) \
        .withColumn('condition', f.when(f.col('month') == f.col('lag') + 2, True).otherwise(False))
df3.show(10, False)

+-------+-----+---+---------+
|Store  |month|lag|condition|
+-------+-----+---+---------+
|Store 1|207  |0  |false    |
|Store 1|208  |0  |false    |
|Store 1|209  |207|true     |
|Store 2|207  |0  |false    |
|Store 2|209  |0  |false    |
|Store 2|212  |207|false    |
+-------+-----+---+---------+

df2.join(df3, ['Store', 'month'], 'inner') \
  .withColumn('avg', f.when(f.col('condition') == True, f.avg('dollars').over(w2)).otherwise(0)) \
  .show(10, False)

+-------+-----+-------+-------------------+---+---------+-----+
|Store  |month|dollars|timestampGMT       |lag|condition|avg  |
+-------+-----+-------+-------------------+---+---------+-----+
|Store 1|207  |17     |2017-03-10 15:27:18|0  |false    |0.0  |
|Store 1|208  |13     |2017-04-15 12:27:18|0  |false    |0.0  |
|Store 1|209  |25     |2017-05-18 11:27:18|207|true     |18.25|
|Store 1|209  |18     |2017-05-19 11:27:18|207|true     |18.25|
|Store 2|207  |13     |2017-03-15 12:27:18|0  |false    |0.0  |
|Store 2|209  |25     |2017-05-18 11:27:18|0  |false    |0.0  |
|Store 2|212  |25     |2017-08-18 11:27:18|207|false    |0.0  |
+-------+-----+-------+-------------------+---+---------+-----+
Lamanus
  • 12,898
  • 4
  • 21
  • 47