My situation is the following:
I have a Dataframe consisting of a time series of symbolic (categorical) values. It looks similar to this:
idx symbol partition
0 A 0
1 C 0
2 B 0
3 C 0
4 A 0
5 C 1
6 B 1
7 D 1
8 C 1
9 B 1
My goal is to now make a sliding window and collect n leading values to an array.
I achieved this by:
sliding_window = Window.partitionBy("partition").orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))
This leads to the following Dataframe:
idx symbol partition sliding
0 A 0 [A, C, B]
1 C 0 [C, B, C]
2 B 0 [B, C, A]
3 C 0 [C, A]
4 A 0 [A]
5 C 1 [C, B, D]
6 B 1 [B, D, C]
7 D 1 [D, C, B]
8 C 1 [C, B]
9 B 1 [B]
So far so good. Because of the partitioning nature in Spark the sliding arrays get shorter when they reach the end of a partition because of the missing information of leading rows that exist in another partition. For the end of the timeseries that cannot be avoided, but it would be desirable to have the sliding window don't miss any information in the middle (indexes 3 and 4 in this example).
The desired Dataframe would look like this:
idx symbol partition sliding
0 A 0 [A, C, B]
1 C 0 [C, B, C]
2 B 0 [B, C, A]
3 C 0 [C, A, C]
4 A 0 [A, C, B]
5 C 1 [C, B, D]
6 B 1 [B, D, C]
7 D 1 [D, C, B]
8 C 1 [C, B]
9 B 1 [B]
Optimal would be to have overlapping partitions, so that indexes 5 and 6 exist in both partitions redundant and I can calculate the needed sliding window. Is there any way to achieve this?
With overlapping data the original Dataframe would look like this:
idx symbol partition
0 A 0
1 C 0
2 B 0
3 C 0
4 A 0
5 C 0
6 B 0
5 C 1
6 B 1
7 D 1
8 C 1
9 B 1
So basically the two first rows of partition 1 would be copied and appended as the last rows of partition 0.
I thought about filtering the partition boundary information and calculating the necessary information locally and join back on the original Dataframe after that, but I'd like to have an easier approach.