I have the following data frame in pyspark:
date | user_country | account_type | num_listens |
---|---|---|---|
2022-08-01 | UK | premium | 32 |
2022-08-01 | DE | free | 64 |
2022-08-01 | FR | free | 93 |
2022-08-01 | UK | free | 51 |
2022-08-02 | UK | premium | 26 |
2022-08-02 | FR | free | 34 |
2022-08-02 | DE | free | 29 |
2022-08-02 | DE | premium | 41 |
2022-08-02 | DE | free | 12 |
2022-08-02 | FR | premium | 31 |
2022-08-03 | FR | free | 55 |
2022-08-03 | UK | premium | 38 |
2022-08-03 | UK | premium | 51 |
2022-08-03 | FR | free | 81 |
2022-08-04 | DE | free | 6 |
2022-08-04 | UK | premium | 97 |
2022-08-04 | FR | free | 33 |
2022-08-04 | UK | premium | 41 |
2022-08-04 | FR | premium | 67 |
2022-08-04 | DE | free | 86 |
2022-08-04 | DE | free | 25 |
2022-08-04 | FR | free | 16 |
2022-08-04 | FR | free | 48 |
2022-08-04 | UK | premium | 11 |
2022-08-04 | UK | free | 24 |
2022-08-05 | DE | free | 95 |
2022-08-05 | FR | free | 68 |
2022-08-05 | DE | premium | 23 |
2022-08-05 | UK | free | 79 |
2022-08-05 | UK | free | 41 |
2022-08-05 | DE | premium | 99 |
columns = ["date", "user_country","account_type", "num_listens"]
data = [("2022-08-01", "UK", "premium", "32"),
("2022-08-01", "DE", "free", "64"),
("2022-08-01", "FR", "free", "93"),
("2022-08-01", "UK", "free", "51"),
("2022-08-02", "UK", "premium", "26"),
("2022-08-02", "FR", "free", "34"),
("2022-08-02", "DE", "free", "29"),
("2022-08-02", "DE", "premium", "41"),
("2022-08-02", "DE", "free", "12"),
("2022-08-02", "FR", "premium", "31"),
("2022-08-03", "FR", "free", "55"),
("2022-08-03", "UK", "premium", "38"),
("2022-08-03", "UK", "premium", "51"),
("2022-08-03", "FR", "free", "81"),
("2022-08-04", "DE", "free", "6"),
("2022-08-04", "UK", "premium", "97"),
("2022-08-04", "FR", "free", "33"),
("2022-08-04", "UK", "premium", "41"),
("2022-08-04", "FR", "premium", "67"),
("2022-08-04", "DE", "free", "86"),
("2022-08-04", "DE", "free", "25"),
("2022-08-04", "FR", "free", "16"),
("2022-08-04", "FR", "free", "48"),
("2022-08-04", "UK", "premium", "11"),
("2022-08-04", "UK", "free", "24"),
("2022-08-05", "DE", "free", "95"),
("2022-08-05", "FR", "free", "68"),
("2022-08-05", "DE", "premium", "23"),
("2022-08-05", "UK", "free", "79"),
("2022-08-05", "UK", "free", "41"),
("2022-08-05", "DE", "premium", "99")
]
I'm trying to group this data by user_country, account_type and num_listens, always calculating the median value for each group. On top of this I would like to use a sliding time window to restrict the data I use for each aggregation. For example, when calculating the median value on 2022-08-04, I would only like to use data from the ten dates prior.
The resulting table should look as follows:
snapshot_date | user_country | account_type | median |
---|---|---|---|
2022-08-06 | UK | premium | 38 |
2022-08-06 | DE | free | 29 |
2022-08-06 | FR | free | 52 |
2022-08-06 | UK | free | 46 |
2022-08-06 | DE | premium | 41 |
2022-08-06 | FR | premium | 49 |
2022-08-05 | UK | premium | 38 |
2022-08-05 | DE | free | 27 |
2022-08-05 | FR | free | 48 |
2022-08-05 | UK | free | 38 |
2022-08-05 | DE | premium | 41 |
2022-08-05 | FR | premium | 49 |
2022-08-04 | UK | premium | 35 |
2022-08-04 | DE | free | 29 |
2022-08-04 | FR | free | 68 |
2022-08-04 | UK | free | 51 |
2022-08-04 | DE | premium | 41 |
2022-08-04 | FR | premium | 31 |
2022-08-03 | UK | premium | 29 |
2022-08-03 | DE | free | 29 |
2022-08-03 | FR | free | 64 |
2022-08-03 | UK | free | 51 |
2022-08-03 | DE | premium | 41 |
2022-08-03 | FR | premium | 31 |
2022-08-02 | UK | premium | 32 |
2022-08-02 | DE | free | 64 |
2022-08-02 | FR | free | 93 |
2022-08-02 | UK | free | 51 |
The value in the first row would be the median number of listens for all UK users with the premium account, using data from the previous 10 days (I only included a small sample of 5 days so in this specific case there would not be the full desired rang of 10 days available).
Any help on how this can be achieved in pyspark would be much appreciated. I've been fiddling around with combining a group by with a window function but have been unable to get the desired result.