If I understood the question's author correctly for each row X in the dataframe we want to go over all rows starting from that (ordered by e.g. id
) and for each such row Y compare X.date_1 with Y.date_2. Number of rows Y for which difference between X.date_1 and Y.date_2 is less than e.g. 1 week should be added as a column to row X (e.g. X.result).
Unfortunately windowing functions are not providing a functionality to access X.date_1
inside window functions and thus it is impossible to achieve using windowing functions.
This seems to be very similar to this question where author tries to do a similar thing for Postgres.
But then there is a way to actually do it though through a bit of cheating - i.e. to "materialize" window frame for each Row in an array and then perform the needed operations. Not sure if in your view this would count, but that is the only way Window API could be used to resolve the problem. A possible solution could look like this (assuming we want to count number of rows Y going not earlier than X w.r.t id
with Y.date_2
between X.date_1
and X.date_1 + 7 days
):
import datetime
rawdata = [l.strip('|').replace('|', ' ').split() for l in '''|0 |2017-01-21|2017-04-01 |
|1 |2017-01-22|2017-04-24 |
|2 |2017-02-23|2017-04-30 |
|3 |2017-02-27|2017-04-30 |
|4 |2017-04-23|2017-05-27 |
|5 |2017-04-29|2017-06-30 |
|6 |2017-06-13|2017-07-05 |
|7 |2017-06-13|2017-07-18 |
|8 |2017-06-16|2017-07-19 |
|9 |2017-07-09|2017-08-02 |
|10 |2017-07-18|2017-08-07 |
|11 |2017-07-28|2017-08-11 |
|12 |2017-07-28|2017-08-13 |
|13 |2017-08-04|2017-08-13 |
|14 |2017-08-13|2017-08-13 |
|15 |2017-08-13|2017-08-13 |
|16 |2017-08-13|2017-08-25 |
|17 |2017-08-13|2017-09-10 |
|18 |2017-08-31|2017-09-21 |
|19 |2017-10-03|2017-09-22 |'''.split('\n')]
data = [(int(d[0]), datetime.date.fromisoformat(d[1]), datetime.date.fromisoformat(d[2])) for d in rawdata]
df = spark.createDataFrame(data, schema='id: bigint, date_1: Date, date_2: Date')
from pyspark.sql.window import Window
import pyspark.sql.functions as func
window_spec = Window.orderBy('id').rowsBetween(Window.currentRow, Window.unboundedFollowing)
new_df = df.withColumn('materialized_frame_date_2', func.collect_list(df['date_2']).over(window_spec)) \
.withColumn('result', func.expr('size(filter(materialized_frame_date_2, x -> datediff(x, date_1) BETWEEN 0 AND 7))')) \
.drop('materialized_frame_date_2')
new_df.show()
The result:
+---+----------+----------+------+
| id| date_1| date_2|result|
+---+----------+----------+------+
| 0|2017-01-21|2017-04-01| 0|
| 1|2017-01-22|2017-04-24| 0|
| 2|2017-02-23|2017-04-30| 0|
| 3|2017-02-27|2017-04-30| 0|
| 4|2017-04-23|2017-05-27| 0|
| 5|2017-04-29|2017-06-30| 0|
| 6|2017-06-13|2017-07-05| 0|
| 7|2017-06-13|2017-07-18| 0|
| 8|2017-06-16|2017-07-19| 0|
| 9|2017-07-09|2017-08-02| 0|
| 10|2017-07-18|2017-08-07| 0|
| 11|2017-07-28|2017-08-11| 0|
| 12|2017-07-28|2017-08-13| 0|
| 13|2017-08-04|2017-08-13| 0|
| 14|2017-08-13|2017-08-13| 2|
| 15|2017-08-13|2017-08-13| 1|
| 16|2017-08-13|2017-08-25| 0|
| 17|2017-08-13|2017-09-10| 0|
| 18|2017-08-31|2017-09-21| 0|
| 19|2017-10-03|2017-09-22| 0|
+---+----------+----------+------+