1

I am trying to implement a rolling minimum that has an amortized O(1) get_min(). The amortized O(1) algorithm comes from the accepted answer in this post


Original function:

import pandas as pd
import numpy as np
from numba import njit, prange

def rolling_min_original(data, n):
    return pd.Series(data).rolling(n).min().to_numpy()

My attempt to implement the amortized O(1) get_min() algorithm:(this function has decent performance for non-small n)

@njit
def rollin_min(data, n):
    """
        brief explanations:

        param: stk2: the stack2 in the algorithm, except here it only stores the min stack
        param: stk2_top: it starts at n-1, and drops gradually until it hits -1 then it comes backup to n-1
            if stk2_top= 0 in the current iteration(it will become -1 at the end):
                that means stk2_top is pointing at the bottom element in stk2, 
            after it drops to -1 from 0, in the next iteration, stk2 will  be reassigned to a new array data[i-n+1:i+1],
            because we need to include the current index. 

        at each iteration:
        if stk2_top <0: (i.e. we have 0 stuff in stk2(aka stk2_top <0)
            - copy the past n items(including the current one) to stk2, so that stk2 has n items now
            - pick the top min from stk2(stk2_top = n-1 momentarily)
            - move down the pointer by 1 after the operation(n-1 becomes n-2)

        else: (i.e. we have j(1<=j<= n-1) stuff in stk2)
            - pick the top min from stk2(stk2_top is j-1 momentarily)
            - move down the pointer by 1 after the operation(j-1 becomes j-2)

    """


    if n >1:  

        def min_getter_rev(arr1):
            arr = arr1[::-1]
            result = np.empty(len(arr), dtype = arr1.dtype)
            result[0]= local_min = arr[0]

            for i in range(1,len(arr)):
                if arr[i] < local_min:
                    local_min = arr[i]
                result[i] = local_min
            return result

        result_min= np.empty(len(data), dtype= data.dtype)
        for i in prange(n-1):
            result_min[i] =np.nan


        stk2 = min_getter_rev(data[:n])
        stk2_top = n-2#it is n-2 because the loop starts at n(not n-1)which is the second non nan term
        stk1_min = data[n-1]#stk1 needs to be the first item of the stk1
        result_min[n-1]= min(stk1_min, stk2[-1])    

        for i in range(n,len(data)):
            if stk2_top >= 0:
                if data[i] < stk1_min:
                    stk1_min= min(data[i], stk1_min)#the stk1 min
                result_min[i] = min(stk1_min, stk2[stk2_top])#min of the top element in stk2 and the current element

            else:
                stk2 = min_getter_rev(data[i-n+1:i+1])
                stk2_top= n-1
                stk1_min = data[i]
                result_min[i]= min(stk1_min, stk2[n-1])

            stk2_top -= 1

        return result_min   
    else:
        return data

A naive implementation when n is small:

@njit(parallel= True)
def rolling_min_smalln(data, n):
    result= np.empty(len(data), dtype= data.dtype)

    for i in prange(n-1):
        result[i]= np.nan

    for i in prange(n-1, len(data)):
        result[i]= data[i-n+1: i+1].min()

    return result

Some little code for testing

def remove_nan(arr):
    return arr[~np.isnan(arr)]

if __name__ == '__main__':

    np.random.seed(0)
    data_size = 200000
    data = np.random.uniform(0,1000, size = data_size)+29000

    w_size = 37

    r_min_original= rolling_min_original(data, w_size)
    rmin1 = rollin_min(data, w_size)

    r_min_original = remove_nan(r_min_original)
    rmin1 = remove_nan(rmin1)

    print(np.array_equal(r_min_original,rmin1))

The function rollin_min() has nearly constant runtime and lower runtime than rolling_min_original() when n is large, which is nice. But it has poor performance when n is low(around n < 37 in my pc, in this range rollin_min() can easily be beaten by a naive implementation rolling_min_smalln()).

I am struggling to find ways to improve rollin_min(), but so far I am stuck, which is why I am seeking for help here.


My questions are the following:

Is the algorithm I am implementing the best out there for rolling/sliding window min/max?

If not, what is the best/better algorithm? If so, how can I further improve the function from the algorithm's point of view?

Besides the algorithm itself, what other ways can further improve the performance of the function rollin_min()?


EDIT: Moved my latest answer to the answer section upon multiple requests

mathguy
  • 1,450
  • 1
  • 16
  • 33
  • 4
    It's often the case that algorithms with good asymptotic complexity aren't as fast as naïve approaches when `n` is small. If you really care about performance for both large and small `n`, your function could examine the value of `n` and choose the appropriate implementation. – NPE Sep 22 '19 at 06:56
  • @NPE Thank you for your reply. Actually I have just created a thin wrapper function that examines `n` to pick the appropriate implementation. But honestly, I consider it as a last resort. If there is nothing else we can do to improve the current implementation/algorithm, then I will use that last resort. – mathguy Sep 22 '19 at 06:59
  • 1
    Implementing a queue with get_min is harder than calculating the rolling min for the whole array all at once. Do you really need to implement the queue? – Matt Timmermans Sep 22 '19 at 21:22
  • @MattTimmermans That’s a good point. Implementing a queue is definitely harder, but the speed of function is the primary concern here. So no matter how hard the algorithm it seems I will try to implement it, as long as it actually speeds up the rolling min function. Besides, in this function I merely implemented the get_min() and a reversed numpy array that acts like a stack; I did not implement the full queue data structure. – mathguy Sep 22 '19 at 21:31
  • You shouldn't really include a solution in your answer. If the solution works it would be better to put it into an answer (yes, you can answer your own questions!). – MSeifert Sep 24 '19 at 14:02
  • @MSeifert I was hesitant to put it into an answer because I feel the final version of the function still has room for improvement. When this multicore version is run individually, it certainly is the fastest(only about 20%-30 faster)if n is of medium size. However when I put the final version into a larger function that was thrown into multiprocessing pool of workers, the version given by Tim in the answer outperforms my own(about 0.1 to 2% faster). In other words, my version is slightly slower in an environment where other cpu cores are busy/occupied with tasks. – mathguy Sep 24 '19 at 22:40

2 Answers2

3

The primary cause of slowness in your code is probably the allocation of a new array in min_getter_rev. You should reuse the same storage throughout.

Then, because you don't really have to implement a queue, you can make more optimizations. For example the size of the two stacks is at most (and usually) n, so you you can keep them in the same array with size n. Grow one from the start and one from the end.

You would notice that there is a very regular pattern - fill the array from start to end in order, recalculate the minimums from the end, generate output as you refill the array, repeat...

This leads to an actually simpler algorithm with a simpler explanation that doesn't refer to stacks at all. Here is an implementation, with comments about how it works. Note that I didn't bother stuffing the start with NaNs:

def rollin_min(data, n):

    #allocate the result.  Note the number valid windows is len(data)-(n-1)
    result = np.empty(len(data)-(n-1), data.dtype)

    #every nth position is a "mark"
    #every window therefore contains exactly 1 mark
    #the minimum in the window is the minimum of:
    #  the minimum from the window start to the following mark; and
    #  the minimum from the window end the the preceding (same) mark

    #calculate the minimum from every window start index to the next mark
    for mark in range(n-1, len(data), n):
        v = data[mark]
        if (mark < len(result)):
            result[mark] = v
        for i in range(mark-1, mark-n, -1):
            v = min(data[i],v)
            if (i < len(result)):
                result[i] = v

    #for each window, calculate the running total from the preceding mark
    # to its end.  The first window ends at the first mark
    #then combine it with the first distance to get the window minimum

    nextMarkPos = 0
    for i in range(0,len(result)):
        if i == nextMarkPos:
             v = data[i+n-1]
             nextMarkPos += n
        else:
            v = min(data[i+n-1],v)
        result[i] = min(result[i],v)

    return result
Matt Timmermans
  • 53,709
  • 3
  • 46
  • 87
  • Great! This function is quite a bit faster than my first attempt for a wide range of n. I have just come up with a solution(in my head) that only needs to traverse the data exactly once, but I need quite a bit of time to implement it. – mathguy Sep 23 '19 at 00:43
  • Just finished making a cpu-multi-core version of a simpler implementation. – mathguy Sep 23 '19 at 07:07
  • @mathguy why don't you post it as another answer? – jangorecki Sep 28 '19 at 08:15
  • @jangorecki I don't think my multi-core implementation is good enough. Maybe I should. – mathguy Sep 28 '19 at 08:16
  • @mathguy if it returns correct results it is just another answer, and can inspire readers. – jangorecki Sep 28 '19 at 08:37
  • @jangorecki just posted it as another answer. Hopefully future readers can improve it further. – mathguy Sep 28 '19 at 08:46
1

Moved this from the Question EDIT section to here upon multiple requests.

Inspired by the simpler implementation given by Matt Timmermans in the answer, I have made a cpu-multicore version of the rolling min. The code is as follows:

@njit(parallel= True)
def rollin_min2(data, n):
    """
    1) make a loop that iterates over K sections of n elements; each section is independent so that it can benefit from multicores cpu 
    2) for each iteration of sections, generate backward local minimum(sec_min2) and forward minimum(sec_min1)

    say m=8, len(data)= 23, then we only need the idx= (reversed to 7,6,5,...,1,0 (0 means minimum up until idx= 0)),
    1st iter
    result[7]= min_until 0,
    result[8]= min(min(data[7:9]) and min_until 1),
    result[9]= min(min(data[7:10]) and m_til 2)
    ...
    result[14]= min(min(data[7:15]) and m_til 7) 

    2nd iter
    result[15]= min_until 8,
    result[16]= min(min(data[15:17]) and m_til 9),
    result[17]= min(min(data[15:18]) and m_til 10)
    ...
    result[22]= min(min(data[15:23]) and m_til 15) 


    """
    ar_len= len(data)

    sec_min1= np.empty(ar_len, dtype = data.dtype)
    sec_min2= np.empty(ar_len, dtype = data.dtype)

    for i in prange(n-1):
        sec_min1[i]= np.nan

    for sec in prange(ar_len//n):
        s2_min= data[n*sec+ n-1]
        s1_min= data[n*sec+ n]

        for i in range(n-1,-1,-1):
            if data[n*sec+i] < s2_min:
                s2_min= data[n*sec+i]
            sec_min2[n*sec+i]= s2_min

        sec_min1[n*sec+ n-1]= sec_min2[n*sec]

        for i in range(n-1):
            if n*sec+n+i < ar_len:
                if data[n*sec+n+i] < s1_min:
                    s1_min= data[n*sec+n+i]
                sec_min1[n*sec+n+i]= min(s1_min, sec_min2[n*sec+i+1])

            else:
                break

    return sec_min1 

I have actually spent an hour testing various implementations of rolling min. In my 6C/12T laptop, this multi-core version works best when n is "medium size". When n is at least 30% of the length of the source data though, other implementation starts to outshine. There must be even better ways to improve this function, but at the time of this edit I am not aware of them just yet.

mathguy
  • 1,450
  • 1
  • 16
  • 33