1

I wrote this function to perform a rolling sum on numpy arrays, inspired by this post

def np_rolling_sum(arr, n, axis=0):
    out = np.cumsum(arr, axis=axis)
    slc1 = [slice(None)] * len(arr.shape)
    slc2 = [slice(None)] * len(arr.shape)
    slc1[axis] = slice(n, None)
    slc2[axis] = slice(None, -n)
    out = out[tuple(slc1)] - out[tuple(slc2)]
    shape = list(out.shape)
    shape[axis] = arr.shape[axis] - out.shape[axis]
    out = np.concatenate((np.full(shape, 0), out), axis=axis)
    return out

It works fine, except when I need to use it on large arrays (size is around 1bn). In that case, I get a SIGKILL on this line:

out = out[tuple(slc1)] - out[tuple(slc2)]

I already tried to delete arr after the cumsum since I no more need it (except from its shape that I can store before the deletion), but it didn't help.

My next guess would be to implement a batch management for the operation causing the memory issue. Is there another way for me to write this function better so it will be able to deal with larger arrays ?

Thanks for your help !

Gwalchaved
  • 37
  • 9
  • One option would be to create a copy of "out[tuple(slc1)]" in a memory mapped file and subtract "out[tuple(slc2)]" from it inplace. – Michael Butscher Jul 08 '22 at 10:02

1 Answers1

1

For people who might be interested, I finally added a decorator that checks if numpy arguments are greater than a given size. If so, it turns them into dask arrays.

In order to keep the main function closest to the original, I also added an argument that indicates which library should be used: numpy or dask.array

Here is the final result:

import numpy as np
import dask.array as da

threshold = 50_000_000

def large_file_handler(func):

    def wrapper(*args, **kwargs):

        pos = list(args)
        for i in range(len(pos)):
            if type(pos[i]) == np.ndarray and pos[i].size > threshold:
                pos[i] = da.from_array(pos[i])
                kwargs['func_lib'] = da
        for k in kwargs:
            if type(kwargs[k]) == np.ndarray and pos[kwargs[k]].size > threshold:
                kwargs[k] = da.from_array(kwargs[k])
                kwargs['func_lib'] = da
        return func(*pos, **kwargs)

    return wrapper


@large_file_handler
def np_rolling_sum(arr, n, axis=0, func_lib=np):

    out = func_lib.cumsum(arr, axis=axis)
    slc1 = [slice(None)] * len(arr.shape)
    slc2 = [slice(None)] * len(arr.shape)
    slc1[axis] = slice(n, None)
    slc2[axis] = slice(None, -n)
    out = out[tuple(slc1)] - out[tuple(slc2)]
    shape = list(out.shape)
    shape[axis] = arr.shape[axis] - out.shape[axis]
    out = func_lib.concatenate((np.full(shape, 0), out), axis=axis)
    return np.array(out)

Please feel free to tell me if this could be improved.

Gwalchaved
  • 37
  • 9