I am trying to implement a rolling minimum that has an amortized O(1) get_min()
. The amortized O(1) algorithm comes from the accepted answer in this post
Original function:
import pandas as pd
import numpy as np
from numba import njit, prange
def rolling_min_original(data, n):
return pd.Series(data).rolling(n).min().to_numpy()
My attempt to implement the amortized O(1) get_min()
algorithm:(this function has decent performance for non-small n
)
@njit
def rollin_min(data, n):
"""
brief explanations:
param: stk2: the stack2 in the algorithm, except here it only stores the min stack
param: stk2_top: it starts at n-1, and drops gradually until it hits -1 then it comes backup to n-1
if stk2_top= 0 in the current iteration(it will become -1 at the end):
that means stk2_top is pointing at the bottom element in stk2,
after it drops to -1 from 0, in the next iteration, stk2 will be reassigned to a new array data[i-n+1:i+1],
because we need to include the current index.
at each iteration:
if stk2_top <0: (i.e. we have 0 stuff in stk2(aka stk2_top <0)
- copy the past n items(including the current one) to stk2, so that stk2 has n items now
- pick the top min from stk2(stk2_top = n-1 momentarily)
- move down the pointer by 1 after the operation(n-1 becomes n-2)
else: (i.e. we have j(1<=j<= n-1) stuff in stk2)
- pick the top min from stk2(stk2_top is j-1 momentarily)
- move down the pointer by 1 after the operation(j-1 becomes j-2)
"""
if n >1:
def min_getter_rev(arr1):
arr = arr1[::-1]
result = np.empty(len(arr), dtype = arr1.dtype)
result[0]= local_min = arr[0]
for i in range(1,len(arr)):
if arr[i] < local_min:
local_min = arr[i]
result[i] = local_min
return result
result_min= np.empty(len(data), dtype= data.dtype)
for i in prange(n-1):
result_min[i] =np.nan
stk2 = min_getter_rev(data[:n])
stk2_top = n-2#it is n-2 because the loop starts at n(not n-1)which is the second non nan term
stk1_min = data[n-1]#stk1 needs to be the first item of the stk1
result_min[n-1]= min(stk1_min, stk2[-1])
for i in range(n,len(data)):
if stk2_top >= 0:
if data[i] < stk1_min:
stk1_min= min(data[i], stk1_min)#the stk1 min
result_min[i] = min(stk1_min, stk2[stk2_top])#min of the top element in stk2 and the current element
else:
stk2 = min_getter_rev(data[i-n+1:i+1])
stk2_top= n-1
stk1_min = data[i]
result_min[i]= min(stk1_min, stk2[n-1])
stk2_top -= 1
return result_min
else:
return data
A naive implementation when n
is small:
@njit(parallel= True)
def rolling_min_smalln(data, n):
result= np.empty(len(data), dtype= data.dtype)
for i in prange(n-1):
result[i]= np.nan
for i in prange(n-1, len(data)):
result[i]= data[i-n+1: i+1].min()
return result
Some little code for testing
def remove_nan(arr):
return arr[~np.isnan(arr)]
if __name__ == '__main__':
np.random.seed(0)
data_size = 200000
data = np.random.uniform(0,1000, size = data_size)+29000
w_size = 37
r_min_original= rolling_min_original(data, w_size)
rmin1 = rollin_min(data, w_size)
r_min_original = remove_nan(r_min_original)
rmin1 = remove_nan(rmin1)
print(np.array_equal(r_min_original,rmin1))
The function rollin_min()
has nearly constant runtime and lower runtime than rolling_min_original()
when n
is large, which is nice. But it has poor performance when n
is low(around n < 37
in my pc, in this range rollin_min()
can easily be beaten by a naive implementation rolling_min_smalln()
).
I am struggling to find ways to improve rollin_min()
, but so far I am stuck, which is why I am seeking for help here.
My questions are the following:
Is the algorithm I am implementing the best out there for rolling/sliding window min/max?
If not, what is the best/better algorithm? If so, how can I further improve the function from the algorithm's point of view?
Besides the algorithm itself, what other ways can further improve the performance of the function rollin_min()
?
EDIT: Moved my latest answer to the answer section upon multiple requests