7

I'm looking for an efficient way to segment numpy arrays into overlapping chunks. I know that numpy.lib.stride_tricks.as_strided is probably the way to go, but I can't seem to wrap my head around its usage in a generalized function that works on arrays with arbitrary shape. Here are some examples for specific applications of as_strided.

This is what I would like to have:

import numpy as np
from numpy.lib.stride_tricks import as_strided

def segment(arr, axis, new_len, step=1, new_axis=None):
    """ Segment an array along some axis.

    Parameters
    ----------
    arr : array-like
        The input array.

    axis : int
        The axis along which to segment.

    new_len : int
        The length of each segment.

    step : int, default 1
        The offset between the start of each segment.

    new_axis : int, optional
        The position where the newly created axis is to be inserted. By
        default, the axis will be added at the end of the array.

    Returns
    -------
    arr_seg : array-like
        The segmented array.
    """

    # calculate shape after segmenting
    new_shape = list(arr.shape)
    new_shape[axis] = (new_shape[axis] - new_len + step) // step
    if new_axis is None:
        new_shape.append(new_len)
    else:
        new_shape.insert(new_axis, new_len)

    # TODO: calculate new strides
    strides = magic_command_returning_strides(...)

    # get view with new strides
    arr_seg = as_strided(arr, new_shape, strides)

    return arr_seg.copy()

So I would like to specify the axis that is going to be cut into segments, the length of the segments and the step between them. Additionally, I'd like to pass the position where the new axis is inserted as a parameter. The only thing that's missing is the calculation of the strides.

I'm aware that this might not work this way directly with as_strided, i.e. I might need to implement a subroutine that returns a strided view with step=1 and new_axis in a fixed position and then slice with the desired step and transpose afterwards.

Here's a piece of code that works, but is obviously quite slow:

def segment_slow(arr, axis, new_len, step=1, new_axis=None):
    """ Segment an array along some axis. """

    # calculate shape after segmenting
    new_shape = list(arr.shape)
    new_shape[axis] = (new_shape[axis] - new_len + step) // step
    if new_axis is None:
        new_shape.append(new_len)
    else:
        new_shape.insert(new_axis, new_len)

    # check if the new axis is inserted before the axis to be segmented
    if new_axis is not None and new_axis <= axis:
        axis_in_arr_seg = axis + 1
    else:
        axis_in_arr_seg = axis

    # pre-allocate array
    arr_seg = np.zeros(new_shape, dtype=arr.dtype)

    # setup up indices
    idx_old = [slice(None)] * arr.ndim
    idx_new = [slice(None)] * len(new_shape)

    # get order of transposition for assigning slices to the new array
    order = list(range(arr.ndim))
    if new_axis is None:
        order[-1], order[axis] = order[axis], order[-1]
    elif new_axis > axis:
        order[new_axis-1], order[axis] = order[axis], order[new_axis-1]

    # loop over the axis to be segmented
    for n in range(new_shape[axis_in_arr_seg]):
        idx_old[axis] = n * step + np.arange(new_len)
        idx_new[axis_in_arr_seg] = n
        arr_seg[tuple(idx_new)] = np.transpose(arr[idx_old], order)

    return arr_seg

Here's a test for the basic functionality:

import numpy.testing as npt    

arr = np.array([[0, 1, 2, 3],
                [4, 5, 6, 7],
                [8, 9, 10, 11]])

arr_seg_1 = segment_slow(arr, axis=1, new_len=3, step=1)
arr_target_1 = np.array([[[0, 1, 2], [1, 2, 3]],
                         [[4, 5, 6], [5, 6, 7]],
                         [[8, 9, 10], [9, 10, 11]]])

npt.assert_allclose(arr_target_1, arr_seg_1)

arr_seg_2 = segment_slow(arr, axis=1, new_len=3, step=1, new_axis=1)
arr_target_2 = np.transpose(arr_target_1, (0, 2, 1))

npt.assert_allclose(arr_target_2, arr_seg_2)

arr_seg_3 = segment_slow(arr, axis=0, new_len=2, step=1)
arr_target_3 = np.array([[[0, 4], [1, 5], [2, 6], [3, 7]],
                         [[4, 8], [5, 9], [6, 10], [7, 11]]])

npt.assert_allclose(arr_target_3, arr_seg_3)

Any help would be greatly appreciated!

phausamann
  • 115
  • 7
  • 1
    Congratulations for the very well-formed question! :) – Andras Deak -- Слава Україні Feb 09 '18 at 09:05
  • [My attempt at a canonical answer](https://stackoverflow.com/questions/45960192/using-numpy-as-strided-function-to-create-patches-tiles-rolling-or-sliding-w) does all these except rolling the new axis to the specified position. Should be easy enough to add a `np.rollaxis` to the end. – Daniel F Feb 09 '18 at 09:28
  • Thanks @DanielF, I've managed to implement the functionality I need with a wrapper around your function. I'll post an answer to my question based on your approach. – phausamann Feb 09 '18 at 13:45

1 Answers1

1

Based on DanielF's comment and his answer here, I implemented my function like this:

def segment(arr, axis, new_len, step=1, new_axis=None, return_view=False):
    """ Segment an array along some axis.

    Parameters
    ----------
    arr : array-like
        The input array.

    axis : int
        The axis along which to segment.

    new_len : int
        The length of each segment.

    step : int, default 1
        The offset between the start of each segment.

    new_axis : int, optional
        The position where the newly created axis is to be inserted. By
        default, the axis will be added at the end of the array.

    return_view : bool, default False
        If True, return a view of the segmented array instead of a copy.

    Returns
    -------
    arr_seg : array-like
        The segmented array.
    """

    old_shape = np.array(arr.shape)

    assert new_len <= old_shape[axis],  \
        "new_len is bigger than input array in axis"
    seg_shape = old_shape.copy()
    seg_shape[axis] = new_len

    steps = np.ones_like(old_shape)
    if step:
        step = np.array(step, ndmin = 1)
        assert step > 0, "Only positive steps allowed"
        steps[axis] = step

    arr_strides = np.array(arr.strides)

    shape = tuple((old_shape - seg_shape) // steps + 1) + tuple(seg_shape)
    strides = tuple(arr_strides * steps) + tuple(arr_strides)

    arr_seg = np.squeeze(
        as_strided(arr, shape = shape, strides = strides))

    # squeeze will move the segmented axis to the first position
    arr_seg = np.moveaxis(arr_seg, 0, axis)

    # the new axis comes right after
    if new_axis is not None:
        arr_seg = np.moveaxis(arr_seg, axis+1, new_axis)
    else:
        arr_seg = np.moveaxis(arr_seg, axis+1, -1)

    if return_view:
        return arr_seg
    else:
        return arr_seg.copy()

This works well for my case of one-dimensional segments, however, I'd recommend anyone looking for a way that works for segments of arbitrary dimensionality to check out the code in the linked answer.

phausamann
  • 115
  • 7
  • Feel free to upvote that question so other people find it too! Sorry I didn't have time to write out a complete answer. Also beware the `np.moveaxis` will create a copy of the `as_strided` view, so this method can cause memory errors if the view you create is much bigger than the original array. – Daniel F Feb 12 '18 at 11:21
  • The [documentation](https://docs.scipy.org/doc/numpy/reference/generated/numpy.moveaxis.html) says that `moveaxis` returns a view... – phausamann Feb 12 '18 at 13:58
  • woops! I don't know which command I was thinking of then. Should be good! – Daniel F Feb 13 '18 at 11:19