1

My situation is the following: I have a Dataframe consisting of a time series of symbolic (categorical) values. It looks similar to this: idx symbol partition 0 A 0 1 C 0 2 B 0 3 C 0 4 A 0 5 C 1 6 B 1 7 D 1 8 C 1 9 B 1

My goal is to now make a sliding window and collect n leading values to an array.

I achieved this by:

sliding_window = Window.partitionBy("partition").orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))

This leads to the following Dataframe:

    idx    symbol    partition    sliding
    0      A         0            [A, C, B]
    1      C         0            [C, B, C]
    2      B         0            [B, C, A]
    3      C         0               [C, A]
    4      A         0                  [A]
    5      C         1            [C, B, D]
    6      B         1            [B, D, C]
    7      D         1            [D, C, B]
    8      C         1               [C, B]
    9      B         1                  [B]

So far so good. Because of the partitioning nature in Spark the sliding arrays get shorter when they reach the end of a partition because of the missing information of leading rows that exist in another partition. For the end of the timeseries that cannot be avoided, but it would be desirable to have the sliding window don't miss any information in the middle (indexes 3 and 4 in this example).

The desired Dataframe would look like this:

    idx    symbol    partition    sliding
    0      A         0            [A, C, B]
    1      C         0            [C, B, C]
    2      B         0            [B, C, A]
    3      C         0            [C, A, C]
    4      A         0            [A, C, B]
    5      C         1            [C, B, D]
    6      B         1            [B, D, C]
    7      D         1            [D, C, B]
    8      C         1               [C, B]
    9      B         1                  [B]

Optimal would be to have overlapping partitions, so that indexes 5 and 6 exist in both partitions redundant and I can calculate the needed sliding window. Is there any way to achieve this?

With overlapping data the original Dataframe would look like this:

    idx    symbol    partition    
    0      A         0        
    1      C         0        
    2      B         0        
    3      C         0        
    4      A         0
    5      C         0
    6      B         0
    5      C         1        
    6      B         1        
    7      D         1        
    8      C         1           
    9      B         1              

So basically the two first rows of partition 1 would be copied and appended as the last rows of partition 0.

I thought about filtering the partition boundary information and calculating the necessary information locally and join back on the original Dataframe after that, but I'd like to have an easier approach.

Yanikovic
  • 39
  • 4

1 Answers1

1

In your example, if you simply don't partition the window it would give you what you want

sliding_window = Window.orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))

Gives

 idx    symbol    block    sliding
    0      A         0        [A, C, B]
    1      C         0        [C, B, C]
    2      B         0        [B, C, A]
    3      C         0        [C, A, C]
    4      A         0        [A, C, B]
    5      C         1        [C, B, D]
    6      B         1        [B, D, C]
    7      D         1        [D, C, B]
    8      C         1           [C, B]
    9      B         1              [B]

Also, be careful, collect_list() doesn't respect order (due to the distributed nature of spark) so your symbols would get mixed up in the list.

  • The thing is that I explicitly want to perform the sliding window distributed (thats why I created the "block"-Column, to keep the partitioning during the window aggregation). Not defining a partition would move all data to a single partition which is not scalable. – Yanikovic Jul 31 '19 at 10:41
  • 1
    Your could use this trick to duplicate a row : https://stackoverflow.com/questions/50624745/pyspark-how-to-duplicate-a-row-n-time-in-dataframe To identify which rows you need to duplicate, you can add a column with max("idx").over(Window.partitionBy("block")) and duplicates those where idx == max or idx == max-1 – Josselin G. Jul 31 '19 at 11:44
  • I thought about this as well, but just duplicating the rows (5 and 6) doesn't bring them to the correct partition. They would be in block 1 but I need them in block 0 to compute the sliding window. – Yanikovic Jul 31 '19 at 12:44
  • I assume you use repartition() in your code beforehand ? If you can't move it after this block of code I think it's rip. – Josselin G. Jul 31 '19 at 12:53
  • There is no repartition call prior to this. The block column is created beforehand with spark_partition_id function. It is intended to keep the existing partitioning when the window function is used. An idea I have – Yanikovic Jul 31 '19 at 13:17
  • An idea I have is to somehow assign the new duplicated rows to block 0 so that I can call repartition and spark moves them around. The question then would be how to determine which block will be assigned to the new rows. – Yanikovic Jul 31 '19 at 13:23