3

The source code in question is

import numpy as np
dd=lambda x: np.nanmax(1.0 - x / np.fmax.accumulate(x))
df.rolling(window=period, min_periods=1).apply(dd)

It takes an extremely long time to execute the above 2 lines of code. It is with latest pandas version(1.4.0). The dataframe has 3000 rows and 2000 columns only.

Same code with previous pandas version(0.23.x) provides result much faster.

I've tried with other suggessions and questions like Slow performance of pandas groupby/apply but are of not much help.

period is a int variable with value 250.

Michael Szczesny
  • 4,911
  • 5
  • 15
  • 32
jagpreet
  • 53
  • 1
  • 10
  • Since it seems to be working differently in an older pandas version, have you already posted this as an issue on [their github](https://github.com/pandas-dev/pandas/issues)? – FlyingTeller Apr 08 '22 at 11:09
  • @FlyingTeller. Not yet. I'm not even aware of what working difference are you referring to. – jagpreet Apr 08 '22 at 11:15
  • 1
    Referring to your sentence `Same code with previous pandas version(0.23.x) provides result much faster.`. This sounds to me like you are not doing anything wrong but that the new pandas version introduced changes that make it slower – FlyingTeller Apr 08 '22 at 11:19
  • Thanks @FlyingTeller I've raised the issue on the link provided. – jagpreet Apr 08 '22 at 11:36
  • what means `"much faster"`? Did you measure time? You could show time results. – furas Apr 08 '22 at 12:30

2 Answers2

1

These are not a solution, at most workarounds for simple cases like the example function. But it confirms the suspicion that the processing speed of df.rolling.apply is anything but optimal.

Using a much smaller dataset for obvious reasons

import pandas as pd
import numpy as np

df = pd.DataFrame(
    np.random.rand(200,100)
)
period = 10
res = [0,0]

Running time with pandas v1.3.5

%%timeit -n1 -r1
dd=lambda x: np.nanmax(1.0 - x / np.fmax.accumulate(x))
res[0] = df.rolling(window=period, min_periods=1).apply(dd)
# 1 loop, best of 1: 8.72 s per loop

Against a numpy implementation

from numpy.lib.stride_tricks import sliding_window_view as window
%%timeit
x = window(np.vstack([np.full((period-1,df.shape[1]), np.nan),df.to_numpy()]), period, axis=0)
res[1] = np.nanmax(1.0 - x / np.fmax.accumulate(x, axis=-1), axis=-1)
# 100 loops, best of 5: 3.39 ms per loop
np.testing.assert_allclose(res[0], res[1])

8.72*1000 / 3.39 = 2572.27 x speedup.


Processing columns in chunks

l = []
for arr in np.array_split(df.to_numpy(), 100, 1):
    x = window(np.vstack([np.full((period-1,arr.shape[1]), np.nan),arr]), period, axis=0)
    l.append(np.nanmax(1.0 - x / np.fmax.accumulate(x, axis=-1), axis=-1))
res[1] = np.hstack(l)
# 1 loop, best of 5: 9.15 s per loop for df.shape (2000,2000)

Using pandas numba engine

We can get even faster with pandas support for numba jitted functions. Unfortunately numba v0.55.1 can't compile ufunc.accumulate. We have to write our own implementation of np.fmax.accumulate (no guarantees on my implementation). Please note that the first call is slower because the function needs to be compiled.

def dd_numba(x):
    res = np.empty_like(x)
    res[0] = x[0]
    for i in range(1, len(res)):
        if res[i-1] > x[i] or np.isnan(x[i]):
            res[i] = res[i-1]
        else:
            res[i] = x[i]
    return np.nanmax(1.0 - x / res)
df.rolling(window=period, min_periods=1).apply(dd_numba, engine='numba', raw=True)

We can use the familiar pandas interface and it's ~1.16x faster than my chunked numpy approach for df.shape (2000,2000).

Michael Szczesny
  • 4,911
  • 5
  • 15
  • 32
  • 1
    Thanks @Michael. The workaround seems to to work fantastically well on small dataframes, however it doesn't seem to work on the dataframe size I mentioned. Error code is "numpy.core._exceptions._ArrayMemoryError: Unable to allocate 20.6 GiB for an array with shape (5909, 1874, 250) and data type float64". I guess I need to segment DF into pieces to make it work. Any suggestion.. – jagpreet Apr 08 '22 at 18:09
  • I see, the vectorized function is trying to allocate 20 GiB.I have enough memory to process, so I didn't notice. Splitting every 100 columns should work for your example since each column can be calculated separately. – Michael Szczesny Apr 08 '22 at 18:50
  • I added a solution to process columns in chunks and then combine them. – Michael Szczesny Apr 08 '22 at 19:04
  • Thanks Michael. I figured out with my own vectorized solution, by taking chunk of 10 columns. Now just to tell you the performance difference. The source code mentioned in problem statement(question) is taking almost 50 minutes to get the output. Vectorized function using your function give the output in 40 seconds. Thank you so much. Only thing remains is I'm yet to understand the workaround you suggested. – jagpreet Apr 08 '22 at 19:11
0

Take a look at the parallel-pandas library. With its help, you can parallelize the apply method of a sliding window. Thanks Michael Szczesny for dd_numba function. I considered the dataframe of the size you need

import pandas as pd
import numpy as np
from time import monotonic
from parallel_pandas import ParallelPandas


def dd_numba(x):
    res = np.empty_like(x)
    res[0] = x[0]
    for i in range(1, len(res)):
        if res[i - 1] > x[i] or np.isnan(x[i]):
            res[i] = res[i - 1]
        else:
            res[i] = x[i]
    return np.nanmax(1.0 - x / res)


if __name__ == '__main__':
    # initialize parallel-pandas
    ParallelPandas.initialize(n_cpu=4, split_factor=1)
    df = pd.DataFrame(np.random.rand(3000, 2000))
    period = 250
    dd = lambda x: np.nanmax(1.0 - x / np.fmax.accumulate(x))

    start = monotonic()
    res = df.rolling(window=period, min_periods=1).apply(dd)
    print(f'synchronous time took: {monotonic() - start:.1f} s.')

    start = monotonic()
    res = df.rolling(window=period, min_periods=1).apply(dd, raw=True)
    print(f'with raw=True time took: {monotonic() - start:.1f} s.')

    start = monotonic()
    res = df.rolling(window=period, min_periods=1).apply(dd_numba, raw=True, engine='numba')
    print(f'numba engine time took: {monotonic() - start:.1f} s.')

    start = monotonic()
    res = df.rolling(window=period, min_periods=1).p_apply(dd, raw=True)
    print(f'parallel with raw=True time took: {monotonic() - start:.1f} s.')
    start = monotonic()
    res = df.rolling(window=period, min_periods=1).p_apply(dd_numba,  raw=True, engine='numba')
    print(f'parallel with raw=True and numba time took: {monotonic() - start:.1f} s.')

Output:
synchronous time took: 994.6 s.
with raw=True time took: 48.6 s.
numba engine time took: 9.8 s.
parallel with raw=True time took: 13.5 s.
parallel with raw=True and numba time took: 1.5 s.

994/1.5 ~ 662.6 x speedup.

padu
  • 689
  • 4
  • 10