I've worked out a possible way without any UDFs. (Explanation after the code)
sdf. \
withColumn('flag_odd', (func.col('epoch')%2 != 0).cast('int')). \
withColumn('flag_even', (func.col('epoch')%2 == 0).cast('int')). \
withColumn('sum_curr_and_prev', func.sum('volume').over(wd.orderBy('epoch').rowsBetween(-1, 0))). \
withColumn('even_collected', func.collect_list(func.when(func.col('flag_odd') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). \
withColumn('odd_collected', func.collect_list(func.when(func.col('flag_even') == 1, func.col('sum_curr_and_prev'))).over(wd.orderBy('epoch').rowsBetween(-6, -1))). \
withColumn('even_collected', func.when((func.col('flag_even') == 1) & (func.col('epoch') > 6), func.col('even_collected'))). \
withColumn('odd_collected', func.when((func.col('flag_odd') == 1) & (func.col('epoch') > 6), func.col('odd_collected'))). \
withColumn('all_collections', func.coalesce('even_collected', 'odd_collected')). \
withColumn('median_val', func.sort_array('all_collections')[1]). \
show()
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# |epoch|volume|flag_odd|flag_even|sum_curr_and_prev|even_collected| odd_collected|all_collections|median_val|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
# | 1| 30| 1| 0| 30| null| null| null| null|
# | 2| 77| 0| 1| 107| null| null| null| null|
# | 3| 73| 1| 0| 150| null| null| null| null|
# | 4| 57| 0| 1| 130| null| null| null| null|
# | 5| 6| 1| 0| 63| null| null| null| null|
# | 6| 37| 0| 1| 43| null| null| null| null|
# | 7| 75| 1| 0| 112| null| [107, 130, 43]| [107, 130, 43]| 107|
# | 8| 44| 0| 1| 119|[150, 63, 112]| null| [150, 63, 112]| 112|
# | 9| 50| 1| 0| 94| null| [130, 43, 119]| [130, 43, 119]| 119|
# | 10| 65| 0| 1| 115| [63, 112, 94]| null| [63, 112, 94]| 94|
# | 11| 97| 1| 0| 162| null| [43, 119, 115]| [43, 119, 115]| 115|
# | 12| 84| 0| 1| 181|[112, 94, 162]| null| [112, 94, 162]| 112|
# | 13| 18| 1| 0| 102| null|[119, 115, 181]|[119, 115, 181]| 119|
# | 14| 19| 0| 1| 37|[94, 162, 102]| null| [94, 162, 102]| 102|
# | 15| 71| 1| 0| 90| null| [115, 181, 37]| [115, 181, 37]| 115|
# | 16| 46| 0| 1| 117|[162, 102, 90]| null| [162, 102, 90]| 102|
# | 17| 88| 1| 0| 134| null| [181, 37, 117]| [181, 37, 117]| 117|
# | 18| 12| 0| 1| 100|[102, 90, 134]| null| [102, 90, 134]| 102|
# | 19| 24| 1| 0| 36| null| [37, 117, 100]| [37, 117, 100]| 100|
# | 20| 35| 0| 1| 59| [90, 134, 36]| null| [90, 134, 36]| 90|
# +-----+------+--------+---------+-----------------+--------------+---------------+---------------+----------+
I've used the data you provided
data = [(1, 30), (2,77), (3,73), (4,57), (5,6), (6,37), (7,75), (8,44), (9,50), (10,65),
(11,97), (12,84), (13,18), (14,19), (15,71), (16,46), (17,88), (18,12), (19,24), (20,35)]
sdf = spark.sparkContext.parallelize(data).toDF(['epoch', 'volume'])
# +-----+------+
# |epoch|volume|
# +-----+------+
# | 1| 30|
# | 2| 77|
# | 3| 73|
# | 4| 57|
# | 5| 6|
# +-----+------+
- From there, I create the odd and even record identifiers. They'll help us in creating a collection. Note that I've assumed that
epoch
is continuous and ordered.
- Then, I sum pairs of volume values. epoch 2 volume and epoch 1 volume, epoch 3 volume and epoch 2 volume, and so on.
- Creating collection - I've used the sql function
collect_list()
with a when()
condition within it. I figured, for every odd epoch, you collect the median of the previous 3 even epoch's pair sums (step 2). e.g. - for epoch 7, you'll need epoch 6's pair sum, epoch 4's pair sum, epoch 2's pair sum, i.e. [43, 130, 107]
. Similar approach for even epochs.
- once you have the collection, median would be the middle value of the collection. The collection, being a list of 3 values, can be sorted and the second value can be extracted from the list. e.g.
[43, 130, 107]
can be sorted to [43, 107, 130]
and 107
can be extracted with [43, 107, 130][1]