0

I have a dataframe with the following schema

  1. epoch - long (the epoch of an event rounded to the nearest minute)
  2. client_id - string (self explanitory)
  3. volume - long (the count of how many events occurred in that minute)

i want add the following column (called prev-1h-3m-interval-median) - partitioned by client_id, i want for each minute to look at the previous 60 minutes, sum 3 consecutive minutes (0-3, 3-6, ... 57-60) and get the median of the sums.

EDIT - added example for a single client_id

epoch volume
1 30
2 77
3 73
4 57
5 6
6 37
7 75
8 44
9 50
10 65
11 97
12 84
13 18
14 19
15 71
16 46
17 88
18 12
19 24
20 35

This example can be created by the following code:

data = [(1, 30), (2,77), (3,73), (4,57), (5,6), (6,37), (7,75), (8,44), (9,50), (10,65),
        (11,97), (12,84), (13,18), (14,19), (15,71), (16,46), (17,88), (18,12), (19,24), (20,35)]
schema = ['epoch', 'volume']
df = spark.createDataFrame(data, schema=schema)

This example contains less data points but the logic is the same. Here I want to create a column named prev-6-2-interval-median where we want the median of the sums of each 2 consecutive rows in the previous 6 rows. epochs 1-6 would have 0 values because they don't have previous 6 rows....
For epoch 7, the calculation would be
median((30+77), (73+57), (6+37)) = median(107, 130, 43) = 107
For epoch 8, the calculation would be
median((77+73), (57+6), (37+75)) = median(150, 63, 112) = 112
and so on.
So the output of what im trying to achieve would be like so
enter image description here
(sorry for the image - the inline table formatting didn't work well)
How can i achieve this? combination of window functions and/or udf/pandas_udf?

Mr T.
  • 4,278
  • 9
  • 44
  • 61
  • 4
    a sample dataframe v/s an expected output which is reproducible will be better :) Atleast it will be faster for us to answer – anky Aug 11 '21 at 17:07
  • @anky - Thank you for your comment - i amended the question to contain a detailed example. – Mr T. Aug 11 '21 at 22:13
  • 1
    Thank you, however images are very difficult to reproduce. You can take some help from here: https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-examples – anky Aug 12 '21 at 05:41
  • @anky - please see updated question – Mr T. Aug 12 '21 at 16:16
  • @MrT. did my answer help, or were you looking for something different? – samkart Aug 16 '21 at 08:38

1 Answers1

0

I've worked out a possible way without any UDFs. (Explanation after the code)

sdf. \
    withColumn('flag_odd', (func.col('epoch')%2 != 0).cast('int')). \
    withColumn('flag_even', (func.col('epoch')%2 == 0).cast('int')). \
    withColumn('sum_curr_and_prev', func.sum('volume').over(wd.orderBy('epoch').rowsBetween(-1, 0))). \
    withColumn('even_collected', func.collect_list(func.when(func.col('flag_odd') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). \
    withColumn('odd_collected', func.collect_list(func.when(func.col('flag_even') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). \
    withColumn('even_collected', func.when((func.col('flag_even') == 1) & (func.col('epoch') > 6), func.col('even_collected'))). \
    withColumn('odd_collected', func.when((func.col('flag_odd') == 1) & (func.col('epoch') > 6), func.col('odd_collected'))). \
    withColumn('all_collections', func.coalesce('even_collected', 'odd_collected')). \
    withColumn('median_val', func.sort_array('all_collections')[1]). \
    show()

# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# |epoch|volume|flag_odd|flag_even|sum_curr_and_prev|even_collected|  odd_collected|all_collections|median_val|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# |    1|    30|       1|        0|               30|          null|           null|           null|      null|
# |    2|    77|       0|        1|              107|          null|           null|           null|      null|
# |    3|    73|       1|        0|              150|          null|           null|           null|      null|
# |    4|    57|       0|        1|              130|          null|           null|           null|      null|
# |    5|     6|       1|        0|               63|          null|           null|           null|      null|
# |    6|    37|       0|        1|               43|          null|           null|           null|      null|
# |    7|    75|       1|        0|              112|          null| [107, 130, 43]| [107, 130, 43]|       107|
# |    8|    44|       0|        1|              119|[150, 63, 112]|           null| [150, 63, 112]|       112|
# |    9|    50|       1|        0|               94|          null| [130, 43, 119]| [130, 43, 119]|       119|
# |   10|    65|       0|        1|              115| [63, 112, 94]|           null|  [63, 112, 94]|        94|
# |   11|    97|       1|        0|              162|          null| [43, 119, 115]| [43, 119, 115]|       115|
# |   12|    84|       0|        1|              181|[112, 94, 162]|           null| [112, 94, 162]|       112|
# |   13|    18|       1|        0|              102|          null|[119, 115, 181]|[119, 115, 181]|       119|
# |   14|    19|       0|        1|               37|[94, 162, 102]|           null| [94, 162, 102]|       102|
# |   15|    71|       1|        0|               90|          null| [115, 181, 37]| [115, 181, 37]|       115|
# |   16|    46|       0|        1|              117|[162, 102, 90]|           null| [162, 102, 90]|       102|
# |   17|    88|       1|        0|              134|          null| [181, 37, 117]| [181, 37, 117]|       117|
# |   18|    12|       0|        1|              100|[102, 90, 134]|           null| [102, 90, 134]|       102|
# |   19|    24|       1|        0|               36|          null| [37, 117, 100]| [37, 117, 100]|       100|
# |   20|    35|       0|        1|               59| [90, 134, 36]|           null|  [90, 134, 36]|        90|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+

I've used the data you provided

data = [(1, 30), (2,77), (3,73), (4,57), (5,6), (6,37), (7,75), (8,44), (9,50), (10,65),
        (11,97), (12,84), (13,18), (14,19), (15,71), (16,46), (17,88), (18,12), (19,24), (20,35)]

sdf = spark.sparkContext.parallelize(data).toDF(['epoch', 'volume'])

# +-----+------+
# |epoch|volume|
# +-----+------+
# |    1|    30|
# |    2|    77|
# |    3|    73|
# |    4|    57|
# |    5|     6|
# +-----+------+
  • From there, I create the odd and even record identifiers. They'll help us in creating a collection. Note that I've assumed that epoch is continuous and ordered.
  • Then, I sum pairs of volume values. epoch 2 volume and epoch 1 volume, epoch 3 volume and epoch 2 volume, and so on.
  • Creating collection - I've used the sql function collect_list() with a when() condition within it. I figured, for every odd epoch, you collect the median of the previous 3 even epoch's pair sums (step 2). e.g. - for epoch 7, you'll need epoch 6's pair sum, epoch 4's pair sum, epoch 2's pair sum, i.e. [43, 130, 107]. Similar approach for even epochs.
  • once you have the collection, median would be the middle value of the collection. The collection, being a list of 3 values, can be sorted and the second value can be extracted from the list. e.g. [43, 130, 107] can be sorted to [43, 107, 130] and 107 can be extracted with [43, 107, 130][1]
samkart
  • 6,007
  • 2
  • 14
  • 29