1

I wanted to practically calculate a basic dataframe.column.rolling(window).max() but the window is another column of arbitrary integers derived at an earlier stage.

However: methods similar to those found here or here or here appear to be extremely slow to the point of being unusable when the dataframe is large.

I suspect it's because SIMD hardware may prefer a constant nature of window sizes but I wonder if there is a way I miss.

Example data (as found in the first method linked above):

import pandas as pd
import numpy as np

np.random.seed([3,14])
a = np.random.randn(20).cumsum()
w = np.minimum(
    np.random.randint(1, 4, size=a.shape),
    np.arange(len(a))+1
)

df = pd.DataFrame({'Data': a, 'Window': w})
df 
        Data  Window
0  -0.602923       1
1  -1.005579       2
2  -0.703250       3
3  -1.227599       1
4  -0.683756       1
5  -0.670621       2
6  -0.997120       1
7   0.387956       3
8   0.255502       1
9  -0.152361       2
10  1.150534       3
11  0.546298       3
12  0.302936       3
13  0.091674       1
14 -1.964947       1
15 -1.447079       2
16 -1.487828       1
17 -2.539703       1
18 -1.932612       3
19 -4.163049       2

Expected result of an equivalent rolling window max:

0  -0.602923 
1  -0.602923
2  -0.602923 
3  -1.227599 
4  -0.683756 
5  -0.670621 
6  -0.997120 
7   0.387956 
8   0.255502 
9   0.255502  
10  1.150534  
11  1.150534 
12  1.150534
13  0.091674 
14 -1.964947 
15 -1.447079 
16 -1.487828
17 -2.539703 
18 -1.487828 
19 -1.932612 

j riv
  • 3,593
  • 6
  • 39
  • 54
  • 1
    Can you provide a concrete example? (ideally full reproducible code to copy/paste) – mozway Apr 09 '22 at 05:35
  • @mozway The Setup and result in this answer is exactly the data and result needed, https://stackoverflow.com/a/71803558/277716 , I'll add it to the question above. – j riv Apr 09 '22 at 05:43
  • So is this question the *same* as https://stackoverflow.com/questions/71801700/is-there-a-way-to-have-a-rolling-window-that-varies-based-on-an-arbitrary-seri (for which you've already accepted an answer), but you want a faster solution? If so, isn't this just a duplicate question? – Warren Weckesser Apr 09 '22 at 08:39
  • @WarrenWeckesser - Below the accepted answer is an explanatory comment that I think is appropriate. – Michael Szczesny Apr 09 '22 at 08:42
  • 1
    OK. I'm not the SO police, but I think a more appropriate approach would be to simply not accept that answer ("works, but it is too slow to accept!") instead of asking the same question again. – Warren Weckesser Apr 09 '22 at 08:49
  • @WarrenWeckesser I think the problem is that that title did not specify it's for performance (only the last sentence in the text), and the answer is still good for others reading. This question is very clear it's only about performance. – j riv Apr 09 '22 at 10:32
  • Can the title not be edited? – Warren Weckesser Apr 09 '22 at 13:11
  • @WarrenWeckesser editing a title to decisively give a different nature to a question, keeps the answers that were given before the title was for something else. I think it disrupts the work of those giving the answers and others who have already used a question to be helped. – j riv Apr 09 '22 at 13:15
  • The nature isn't *that* different, and there is only one answer to the other question, so it would hardly be disruptive, but like I said, I'm not the SO police, so I'll stop whining. :) – Warren Weckesser Apr 09 '22 at 13:20
  • I think the only issue is that the other question has as a last sentence that performance is preferable, though it's not clearly duplicating because it doesn't make it very clear other answers are irrelevant. This one makes it blatantly clear that if a method is not faster than almost anything in these older questions: it's off topic. – j riv Apr 09 '22 at 13:22

3 Answers3

4

Here's a NumPy-only vectorized method. It isn't as fast as @MichaelSzczesny's custom indexer with the Cython engine, but it might be useful if it turns out that the bug in the Pandas code hasn't been fixed yet.

def windowed_max(a, w):
    k = np.arange(1, len(w)+1)
    windows = np.column_stack((np.maximum(k - w, 0), k))
    return np.maximum.reduceat(a, windows.ravel()[:-1])[::2]

For example,

In [350]: np.random.seed([3,14])
     ...: a = np.random.randn(20).cumsum()
     ...: w = np.minimum(
     ...:     np.random.randint(1, 4, size=a.shape),
     ...:     np.arange(len(a))+1
     ...: )

In [351]: windowed_max(a, w)
Out[351]: 
array([-0.60292337, -0.60292337, -0.60292337, -1.227599  , -0.68375631,
       -0.67062118, -0.99711965,  0.38795643,  0.25550199,  0.25550199,
        1.15053366,  1.15053366,  1.15053366,  0.09167441, -1.96494674,
       -1.44707895, -1.48782801, -2.53970325, -1.48782801, -1.93261164])

In [352]: windowed_max([2, 0, 1], [1, 1, 3])
Out[352]: array([2, 0, 2])

To use it with your Pandas Dataframe, you could write:

df['wmax'] =  windowed_max(df.Data.to_numpy(), df.Window.to_numpy())
Warren Weckesser
  • 110,654
  • 19
  • 194
  • 214
  • Your method is actually the 2nd fastest of all I have tried (including the one from the previous question) and it doesn't seem to be more than ~40% slower than the other answer here, so definitely +1 and a very good substitute until that bug gets fixed. The only technical issue I faced with yours is that it appears to require the first few windows to not be larger than the indices but I guess that's normal and the responsibility of whoever produces the input for it. – j riv Apr 09 '22 at 18:00
  • 1
    *"... it appears to require the first few windows to not be larger than the indices..."* I modified the code to use `np.maximum(k - w, 0)` instead of just `k - w`. That should take care of an input that requests a window size that is larger that the current index. – Warren Weckesser Apr 09 '22 at 19:36
  • I [parallelized](https://stackoverflow.com/a/71818043/14277722) your surprisingly fast solution with `numba`. – Michael Szczesny Apr 10 '22 at 15:31
3

The fastest solution for pandas 1.3.5 I could find is a custom indexer

import pandas as pd
from pandas import api
import numpy as np

class MultiWindowIndexer(api.indexers.BaseIndexer):
    def __init__(self, window):
        self.window = np.array(window)
        super().__init__()

    def get_window_bounds(self, num_values, min_periods, center, closed):
        end = np.arange(num_values, dtype='int64') + 1
        start = np.clip(end - self.window, 0, num_values)
        return start, end

Using the cython implementation of max (which is the default) with 2*10**6 elements, windows of maximal length of 10

np.random.seed([3,14])
a = np.random.randn(2*10**6).cumsum()
w = np.minimum(
    np.random.randint(1, 10, size=a.shape),
    np.arange(len(a))+1
)

df = pd.DataFrame({'Data': a, 'Window': w})

df['max1'] = df.Data.rolling(MultiWindowIndexer(df.Window)).max(engine='cython')
# %timeit 10 loops, best of 5: 116 ms per loop

The numba engine is ~2.4x slower

df['max2'] = df.Data.rolling(MultiWindowIndexer(df.Window)).max(engine='numba')
# %timeit 1 loop, best of 5: 278 ms per loop

[Update: presumably not fixed in pandas 1.4.x]
Oddly enough, the results are different. Unfortunately the cython result is not correct as it uses wrong starting indices for the maximum.

np.testing.assert_allclose(df.max2, df.max1)

Output

Not equal to tolerance rtol=1e-07, atol=0

Mismatched elements: 448192 / 2000000 (22.4%)

Analysis of the bug

The cython implementation seems to remember the largest starting index encountered so far and 'clips' smaller starting indices to the stored value.
More technically correct: only stores the range of the largest start and largest end indices encountered so far in a queue, discarding smaller start indices and making them unavailable.

Michael Szczesny
  • 4,911
  • 5
  • 15
  • 32
  • 1
    I can reproduce the apparent bug with this data: `a = np.array([2.0, 0.0, 1.0])`, `w = np.array([1, 1, 3])`. When put into a dataframe and processed using your `MultiWindowIndex`, the output is `[2., 0., 1.]`, but it should be `[2., 0., 2.]`. I'm using an old version (1.3.4) of Pandas, so I don't know if the problem occurs in the latest release. – Warren Weckesser Apr 09 '22 at 09:32
  • With `w = np.array([1, 2, 3])` it works correctly. So not the size, but the gaps are crucial. Thanks for the easy to understand example. – Michael Szczesny Apr 09 '22 at 09:47
  • If you mean the results are wrong only for the first few indices, then it might be ok for several uses, as it's mainly a topic for performance and many practical cases need only results after the first few indices. – j riv Apr 09 '22 at 10:35
  • 1
    @MichaelSzczesny I get the same result with both settings, so I wonder if it's a problem with module versions (here it's python 3.8.10 pandas 1.4.2 numpy 1.22.3 numba 0.53.1). It's also obscenely faster compared to other methods I tried. +1 for now though it's uncertain on reproducibility (perhaps both the results I get are wrong!). – j riv Apr 09 '22 at 12:22
  • 1
    I just found this [comment](https://github.com/pandas-dev/pandas/blob/1.4.x/pandas/_libs/window/aggregations.pyx#L982) about an update for variable window sizes in the `cython` implementation. Seems to be fixed in `1.4.x`! – Michael Szczesny Apr 09 '22 at 12:38
  • I should add for what it's worth that the answer at https://stackoverflow.com/a/71803558/277716 (albeit noticeably slower than this but still faster than others) appears to agree with results from this method. I had opened a question at the pandas github about this subject https://github.com/pandas-dev/pandas/issues/46716 and if this answer appears correct: it looks like a good candidate for a new pandas feature especially due to its simplicity of usage after the initial indexer is set up (but also the fact it's performant). – j riv Apr 09 '22 at 12:51
  • Something incorrect I noticed in relation to the OP code and expected outcome, is that at index 18 the result is incorrectly -1.932612 instead of -1.487828, which is very odd considering all other results appear correct. – j riv Apr 09 '22 at 13:31
  • That is exactly the incorrect result with `pandas 1.3.5` as well. Slice is `17:19` but should be `16:19`. I guess the fix for `1.4.x` doesn't cover all cases. The indexer produces the correct start and end for the slice `[16,19]`. – Michael Szczesny Apr 09 '22 at 13:39
  • I added the discrepancy as a bug report with reproducibility code to the github issue mentioned above. It's a bit messy though because the ticket had originally started as a question and possible feature request, so I don't know if you want to contribute regarding the issue with a clearer bug report. – j riv Apr 09 '22 at 14:39
  • 1
    This has now its own bug report, https://github.com/pandas-dev/pandas/issues/46726, and it appears to be looked at. – j riv Apr 10 '22 at 19:16
1

A parallelized version of @WarrenWeckesser solution with numba.

import numba as nb    # tested with numba 0.55.1

@nb.njit(parallel=True)
def win_max(a, w):
    end = np.arange(1, len(w) + 1)
    start = np.maximum(end - w, 0)
    res = np.empty(len(w))
    for i in nb.prange(len(w)):
        maxi = a[start[i]]
        for j in range(start[i] + 1, end[i]):
            if a[j] > maxi:
                maxi = a[j]
        res[i] = maxi
    return res

Benchmark on a 2-core colab instance.

import numpy as np

np.random.seed([3,14])
a = np.random.randn(2*10**6).cumsum()
w = np.minimum(
    np.random.randint(1, 10, size=a.shape),
    np.arange(len(a))+1
)

%timeit windowed_max(a, w)
# 10 loops, best of 5: 172 ms per loop
%timeit win_max(a, w)
# 10 loops, best of 5: 21.8 ms per loop

np.testing.assert_allclose(win_max(a, w), windowed_max(a, w))
Michael Szczesny
  • 4,911
  • 5
  • 15
  • 32
  • Thanks because I guess it might be faster for some use cases, but I should point out that for my case it's extremely slower than any of the other two methods here, because I suspect it relies on slow forking while my dataframes while not small: they are not big enough to warrant the overhead (and most of the time I spawn multiple processes for each method anyway so I was definitely not the target audience of this method to begin with). – j riv Apr 11 '22 at 12:03
  • Did you measure the second call or the first call including the compile time? With [`@nb.jit`](https://numba.pydata.org/numba-doc/latest/user/jit.html#lazy-compilation) *compilation is deferred until the first function execution*. – Michael Szczesny Apr 11 '22 at 15:57
  • 1
    I was just measuring win_max(), while I was going through several dfs in a function that wraps it (and didn't wrap the definition). Each df was taking like 4 seconds when the other two methods take like 0.0015 or 0.002 (!). – j riv Apr 12 '22 at 07:18
  • 1
    I realize you might imply the setup of my application or environment was making it re-compile every single time, though I'm not sure why it would do that, since the function definition was at module scope. – j riv Apr 12 '22 at 07:25
  • Interesting, but it is beyond my expertise in `numba` (or `C`) to explain these very different results. Thank you for sharing your results. My benchmarks can be reproduced on a free Google Colab instance (~5x faster than the *custom indexer*). – Michael Szczesny Apr 12 '22 at 08:06