2

I'm trying to collect groups of rows into sliding windows represented as vectors.

Given the example input:

+---+-----+-----+
| id|Label|group|
+---+-----+-----+
|  A|    T|    1|
|  B|    T|    1|
|  C|    F|    2|
|  D|    F|    2|
|  E|    F|    3|
|  F|    T|    3|
|  G|    F|    3|
|  H|    T|    3|
+---+-----+-----+

An expected output would be:

windows_size = 3
stride = 1
id_padding = ''
label_padding = 'f'
+-----+-------------+-------------+
|group|      Windows|       Labels|
+-----+-------------+-------------+
|    1|   [A, B, '']|    [T, T, f]|
|    2|   [C, D, '']|    [F, F, f]|
|    3|    [E, F, G]|    [F, T, F]|
|    3|    [F, G, H]|    [T, F, T]|
+-----+-------------+-------------+

My latest attempt produces tumbling windows without padding. Here's my code:

from pyspark.sql import functions as F
from pyspark.sql import Window

data = [
    ("A", "T", 1),
    ("B", "T", 1),
    ("C", "F", 2),
    ("D", "F", 2),
    ("E", "F", 3),
    ("F", "T", 3),
    ("G", "F", 3),
    ("H", "T", 3),
]
df = spark.createDataFrame(data, ['id', 'label', 'group'])

grouping = 3

w2 = Window.partitionBy('group').orderBy('id')

df = df.withColumn("rows",((F.row_number().over(w2)-1) / grouping).astype('int') )
df.groupBy('group', 'rows')\
  .agg(F.collect_list('id').alias("Windows"), F.collect_list('Label').alias("Labels"))\
  .drop('rows') \
  .orderBy('group').show()

I tried looking for variations of this, maybe by performing a SQL query like in this case or with some built-in SQL function such as ROWS N PRECEDING, but I didn't manage to do what I want. Most results from the web focus on temporal sliding windows, but I'm trying to do it over rows instead.

Any help would be greatly appreciated.

EDIT:
I think I found a solution for the padding thanks to this answer.

I still need to organize the rows in sliding windows though...

Ric S
  • 9,073
  • 3
  • 25
  • 51
  • I found this other [answer](https://stackoverflow.com/a/67117188/13762577) that might help me, but I have yet to make something usable. I'll post an answer if I manage to solve it. – Voxeldoodle Jan 13 '23 at 10:09

2 Answers2

1

One possible solution (not the most elegant one, but still functional) is the following.
In the window definition, it uses .rowsBetween to create a sliding window of the specified size; 0 indicates the current row.

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# parameters
size = 3
id_padding = '\'\''
label_padding = 'f'

# windows
w = Window.partitionBy('group')
w_ordered = Window.partitionBy('group').orderBy('id')
w_ordered_limited = Window.partitionBy('group').orderBy('id').rowsBetween(0, size - 1)

(df.select(
  'group',
  F.collect_list('id').over(w_ordered_limited).alias('Windows'),
  F.collect_list('Label').over(w_ordered_limited).alias('Groups'),
  F.count('group').over(w).alias('n'),
  F.row_number().over(w_ordered).alias('n_row')
  )
  # pad arrays and then slice them to the desired `size`
  .withColumn('Windows', F.when(F.col('n') < size, F.slice(F.concat('Windows', F.array_repeat(F.lit(id_padding), size - 1)), 1, size))
                          .otherwise(F.col('Windows')))
  .withColumn('Groups',  F.when(F.col('n') < size, F.slice(F.concat('Groups', F.array_repeat(F.lit(label_padding), size - 1)), 1, size))
                          .otherwise(F.col('Groups')))
  # filter out useless rows
  .filter( ((F.col('n') < size) & (F.col('n_row') == 1)) 
           | ((F.col('n') >= size) & (F.size('Windows') == size)))
  .drop('n', 'n_row')
 ).show()

+-----+----------+---------+
|group|   Windows|   Groups|
+-----+----------+---------+
|    1|[A, B, '']|[T, T, f]|
|    2|[C, D, '']|[F, F, f]|
|    3| [E, F, G]|[F, T, F]|
|    3| [F, G, H]|[T, F, T]|
+-----+----------+---------+

I suggest you to go through the solution step-by-step, one code line at a time, to understand the logic behind it.

Ric S
  • 9,073
  • 3
  • 25
  • 51
  • Thank you very much! It seems to be working. I will certainly study how it works cause it's an interesting solution. – Voxeldoodle Jan 13 '23 at 11:02
0

To expand on @Ric S answer, I needed to account for the stride as well.

My solution to that was to play around with the condition and transformations based on the stride value and its ratio with the win_size:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# parameters
size = 5
stride = 2
id_padding = '\'\''
label_padding = 'f'

# windows
w = Window.partitionBy('group')
w_ordered = Window.partitionBy('group').orderBy('id')
w_ordered_limited = Window.partitionBy('group').orderBy('id').rowsBetween(0, size - 1)

if stride == 1:
  filter_cond = (F.col('n') >= size) & (F.size('Windows') == size)
else:
  filter_cond = (F.col('n') >= size) & (F.col('n_row') < size) & (F.col('n_row') % stride == 1) & (F.size('Windows') == size)

if size % stride != 0:
  transf = lambda dfcol : F.slice(F.concat(dfcol, F.array_repeat(F.lit(id_padding), size - 1)), 1, size)
else:
  transf = lambda dfcol : F.when(F.col('n') < size, F.slice(F.concat(dfcol, F.array_repeat(F.lit(id_padding), size - 1)), 1, size)) \
                          .otherwise(F.col(dfcol))
(df.select(
  'group',
  F.collect_list('id').over(w_ordered_limited).alias('Windows'),
  F.collect_list('Label').over(w_ordered_limited).alias('Groups'),
  F.count('group').over(w).alias('n'),
  F.row_number().over(w_ordered).alias('n_row')
  )
  # pad arrays and then slice them to the desired `size`
  .withColumn('Windows', transf("Windows"))
  .withColumn('Groups',  transf("Groups"))
  # filter out useless rows
  .filter( ((F.col('n') < size) & (F.col('n_row') == 1)) 
           | (filter_cond))
  .drop('n', 'n_row')
 ).show()

Whose output, given a slightly bigger input than the one given in the question, is:

+-----+------------------+------------------+
|group|           Windows|            Groups|
+-----+------------------+------------------+
|    1|[A, B, '', '', '']|[T, T, '', '', '']|
|    2|[C, D, '', '', '']|[F, F, '', '', '']|
|    3|   [E, F, G, H, I]|   [F, T, F, T, T]|
|    3|  [G, H, I, J, '']|  [F, T, T, T, '']|
+-----+------------------+------------------+