I'm trying to collect groups of rows into sliding windows represented as vectors.
Given the example input:
+---+-----+-----+
| id|Label|group|
+---+-----+-----+
| A| T| 1|
| B| T| 1|
| C| F| 2|
| D| F| 2|
| E| F| 3|
| F| T| 3|
| G| F| 3|
| H| T| 3|
+---+-----+-----+
An expected output would be:
windows_size = 3
stride = 1
id_padding = ''
label_padding = 'f'
+-----+-------------+-------------+
|group| Windows| Labels|
+-----+-------------+-------------+
| 1| [A, B, '']| [T, T, f]|
| 2| [C, D, '']| [F, F, f]|
| 3| [E, F, G]| [F, T, F]|
| 3| [F, G, H]| [T, F, T]|
+-----+-------------+-------------+
My latest attempt produces tumbling windows without padding. Here's my code:
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [
("A", "T", 1),
("B", "T", 1),
("C", "F", 2),
("D", "F", 2),
("E", "F", 3),
("F", "T", 3),
("G", "F", 3),
("H", "T", 3),
]
df = spark.createDataFrame(data, ['id', 'label', 'group'])
grouping = 3
w2 = Window.partitionBy('group').orderBy('id')
df = df.withColumn("rows",((F.row_number().over(w2)-1) / grouping).astype('int') )
df.groupBy('group', 'rows')\
.agg(F.collect_list('id').alias("Windows"), F.collect_list('Label').alias("Labels"))\
.drop('rows') \
.orderBy('group').show()
I tried looking for variations of this, maybe by performing a SQL query like in this case or with some built-in SQL function such as ROWS N PRECEDING, but I didn't manage to do what I want. Most results from the web focus on temporal sliding windows, but I'm trying to do it over rows instead.
Any help would be greatly appreciated.
EDIT:
I think I found a solution for the padding thanks to this answer.
I still need to organize the rows in sliding windows though...