0

Using this sample table:

id sales sales_date
1 10 2020-04-30
1 6 2020-10-31
1 9 2020-09-30
1 2 2021-04-30
2 8 2020-08-31
2 7 2020-07-31
2 3 2021-06-30
2 2 2021-05-31

I would like to calculate the total sum of sales, with the range between two dates.

I assume it would be a window function, partition by id and order by sales_date. But I don't know how to get the sum(sales_date) between two given dates.

win = Window.partitionBy('id').orderBy('sales_date')
df.withColumn('running_sum',sum(sales).over(win).rangeBetween(start_date,end_date) ??

# rangeBetween of start_date and start_date + 1 year

For example;

ID 1 has a start date of 2020-04-30, and I want to get the sum from 2020-04-30 to 2021-04-30.

ID 2 has a start date of 2020-08-31, and the end date would be 2021-08-31.

Referring to this, it seems quite close to what I want but my problem is that each ID can have a different start-date and end-date for the window sum:

pyspark: rolling average using timeseries data

Islam Elbanna
  • 1,438
  • 2
  • 9
  • 15
Dametime
  • 581
  • 1
  • 6
  • 23
  • Are those `start_date, end_date` a global input? e.g. you need to get all sales from 2020 to 2023? or is this different per id? what happens also if part of the id's transactions are outside the range? – Islam Elbanna Jun 19 '23 at 12:04
  • @IslamElbanna It will not be a global input, has each ID can have a different `start_date`. And `end_date` is 1 year after `start_date`. The dates are different per ID. Each ID can have sales outside of the range, that is another thing. For now I am wondering how this range can by between given dates. I also thought about calculating this separately for each ID in a for loop, and union the dataframes at the end – Dametime Jun 19 '23 at 12:07

1 Answers1

1

I think you can do it like this:

  • Use the window function to get the start_date of each order_id
  • Filter any rows which have a difference between the sales date and the start date by more than 12 months (This can be more precise per day)
  • Group by id and sum the sales

Note: I have changed the input so it can filter the last record as out of range

from pyspark.sql.window import Window
from pyspark.sql.functions import first, col, months_between, lit, sum, max

df = spark.createDataFrame([
    (1, 10, datetime.strptime("2020-04-30",'%Y-%m-%d')),\
    (1, 6, datetime.strptime("2020-10-31",'%Y-%m-%d')),\
    (1, 9, datetime.strptime("2020-09-30",'%Y-%m-%d')),\
    (1, 2, datetime.strptime("2021-04-30",'%Y-%m-%d')),\
    (2, 7, datetime.strptime("2020-07-31",'%Y-%m-%d')),\
    (2, 8, datetime.strptime("2020-08-31",'%Y-%m-%d')),\
    (2, 3, datetime.strptime("2021-06-30",'%Y-%m-%d')),\
    (2, 2, datetime.strptime("2021-08-30",'%Y-%m-%d'))\
    ], ['id', 'sales', 'sales_date'])

win = Window.partitionBy('id').orderBy('sales_date')
df = df.withColumn('order_start_date', first(col('sales_date')).over(win)) \
    .filter(months_between(col('sales_date'), col('order_start_date')) <= 12)

df.show()
+---+-----+-------------------+-------------------+
| id|sales|         sales_date|   order_start_date|
+---+-----+-------------------+-------------------+
|  1|   10|2020-04-30 00:00:00|2020-04-30 00:00:00|
|  1|    9|2020-09-30 00:00:00|2020-04-30 00:00:00|
|  1|    6|2020-10-31 00:00:00|2020-04-30 00:00:00|
|  1|    2|2021-04-30 00:00:00|2020-04-30 00:00:00|
|  2|    7|2020-07-31 00:00:00|2020-07-31 00:00:00|
|  2|    8|2020-08-31 00:00:00|2020-07-31 00:00:00|
|  2|    3|2021-06-30 00:00:00|2020-07-31 00:00:00|
+---+-----+-------------------+-------------------+

# 2021-08-30 was removed
df = df.groupBy('id').agg(\
    sum('sales').alias('total_sales'), \
    max('sales_date').alias('latest_sales_date')\
)

df.show()
+---+-----------+-------------------+
| id|total_sales|  latest_sales_date|
+---+-----------+-------------------+
|  1|         27|2021-04-30 00:00:00|
|  2|         18|2021-06-30 00:00:00|
+---+-----------+-------------------+
Islam Elbanna
  • 1,438
  • 2
  • 9
  • 15
  • Well it works, but I would also like to select the most recent sales_date, along with other columns. How could I do this, without putting all the columns in the group_by? – Dametime Jun 19 '23 at 17:14
  • @Dametime you need to add it as another aggregation. I have updated the answer! – Islam Elbanna Jun 19 '23 at 17:28
  • I would like to ask again, right now I could have data stretching for years, how do I calculate like this, for each 12 months between a start date and enddate? – Dametime Jun 21 '23 at 13:41
  • Not sure if I understand correctly, this `months_between` function can calculate the number of months even if more than a year, so if the difference is 2 years then it will return 24 – Islam Elbanna Jun 21 '23 at 15:59