13

How could the application of a function to the elements of a NumPy array through numpy.apply_along_axis() be parallelized so as to take advantage of multiple cores? This seems to be a natural thing to do, in the common case where all the calls to the function being applied are independent.

In my particular case—if this matters—, the axis of application is axis 0: np.apply_along_axis(func, axis=0, arr=param_grid) (np being NumPy).

I had a quick look at Numba, but I can't seem to get this parallelization, with a loop like:

@numba.jit(parallel=True)
result = np.empty(shape=params.shape[1:])
for index in np.ndindex(*result.shape)):  # All the indices of params[0,...]
    result[index] = func(params[(slice(None),) + index])  # Applying func along axis 0

There is also apparently a compilation option in NumPy for parallelization through OpenMP, but it does not seem to be accessible through MacPorts.

One can also think of maybe cutting the array in a few pieces and using threads (so as to avoid copying the data) and applying the function on each piece in parallel. This is more complex than what I am looking for (and might not work if the Global Interpreter Lock is not released enough).

It would be very nice to be able to use multiple cores in a simple way for simple parallelizable tasks like applying a function to all the elements of an array (which is essentially what is needed here, with the small complication that function func() takes a 1D array of parameters).

Eric O. Lebigot
  • 91,433
  • 48
  • 218
  • 260
  • `apply_along_axis` is pure Python code, doing what you show, except it transposes the axis of interest to the end, and does `ndindex(arr.shape[:-1])` on the rest. Alternatives have been discussed in posts like https://stackoverflow.com/questions/45067268/numpy-vectorized-2d-array-operation-error – hpaulj Aug 05 '17 at 22:05
  • Since a n-d problem can be reshaped to 2d (your axis of interest plus the rest), the basic problem is a 1d list comprehension. Iterate over the rows. Another SO question: https://stackoverflow.com/questions/44239498/how-to-apply-a-generic-function-over-numpy-rows – hpaulj Aug 05 '17 at 22:13
  • 1
    I wish these StackOverflow questions contained a solution that uses multiple cores that I could use! Now, I am not sure how a Python list comprehension succeeds in being faster than `np.apply_along_axis()`, but at least maybe the single-core version can be made faster by exploring simple alternatives to `np.apply_along_axis()`… – Eric O. Lebigot Aug 05 '17 at 23:12
  • Various SO have looked at using `multiprocessing.pool.map` on the rows of an array. – hpaulj Aug 06 '17 at 05:03

1 Answers1

22

Alright, I worked it out: an idea is to use the standard multiprocessing module and split the original array in just a few chunks (so as to limit communication overhead with the workers). This can be done relatively easily as follows:

import multiprocessing

import numpy as np

def parallel_apply_along_axis(func1d, axis, arr, *args, **kwargs):
    """
    Like numpy.apply_along_axis(), but takes advantage of multiple
    cores.
    """        
    # Effective axis where apply_along_axis() will be applied by each
    # worker (any non-zero axis number would work, so as to allow the use
    # of `np.array_split()`, which is only done on axis 0):
    effective_axis = 1 if axis == 0 else axis
    if effective_axis != axis:
        arr = arr.swapaxes(axis, effective_axis)

    # Chunks for the mapping (only a few chunks):
    chunks = [(func1d, effective_axis, sub_arr, args, kwargs)
              for sub_arr in np.array_split(arr, multiprocessing.cpu_count())]

    pool = multiprocessing.Pool()
    individual_results = pool.map(unpacking_apply_along_axis, chunks)
    # Freeing the workers:
    pool.close()
    pool.join()

    return np.concatenate(individual_results)

where the function unpacking_apply_along_axis() being applied in Pool.map() is separate as it should (so that subprocesses can import it), and is simply a thin wrapper that handles the fact that Pool.map() only takes a single argument:

def unpacking_apply_along_axis((func1d, axis, arr, args, kwargs)):
    """
    Like numpy.apply_along_axis(), but with arguments in a tuple
    instead.

    This function is useful with multiprocessing.Pool().map(): (1)
    map() only handles functions that take a single argument, and (2)
    this function can generally be imported from a module, as required
    by map().
    """
    return np.apply_along_axis(func1d, axis, arr, *args, **kwargs)

(in Python 3, the argument unpacking must be done manually:

def unpacking_apply_along_axis(all_args):
    """…"""
    (func1d, axis, arr, args, kwargs) = all_args
    …

because argument unpacking was removed).

In my particular case, this resulted in a 2x speedup on 2 cores with hyper-threading. A factor closer to 4x would have been nicer, but the speed up is already nice, in just a few lines of code, and it should be better for machines with more cores (which are quite common). Maybe there is a way of avoiding data copies and using shared memory (maybe through the multiprocessing module itself)?

Eric O. Lebigot
  • 91,433
  • 48
  • 218
  • 260
  • 3
    Thanks for the code in your answer above, this is promising for me as I'm also facing the same issue for an embarrassingly parallelizable problem, and `np.apply_along_axis()` only gets me so far. Another idea I'll add is that dask can help with parallelization, supposedly so anyway as I've not managed to get it working yet since it requires the use of dask arrays instead of numpy arrays and not all numpy functionality is available from the dask API. See https://dask.org/ – James Adams Nov 15 '18 at 16:12
  • 1
    Dask is indeed a good tool to try. Vaex might be another one (https://github.com/maartenbreddels/vaex). – Eric O. Lebigot Nov 15 '18 at 16:33
  • 1
    After putting in play a modified version of the code you provided above I was able to rework my multiprocessing approach (I was previously using a different sort of loop that didn't take advantage of splitting and concatenation), and doing so has given me similar performance but with a much-reduced memory footprint. So thanks again, this has helped me quite a bit. – James Adams Nov 15 '18 at 18:18
  • I get a syntax error on the function definition of `unpacking_apply_along_axis` probably related to the use of double ((. The syntax error occurs when using Python 3, in Python 2 it seems to work. I couldn't find out why exactly. Any help on this? – Basileios Jun 11 '19 at 09:40
  • 1
    Indeed. I just added the Python 3 case in the answer. – Eric O. Lebigot Jun 12 '19 at 10:00
  • This works great. In the case of a func1d with two return values, one would need to again swap back the axes (`if effective_axis != axis`) before returning the final results. When doing so, the behaviour is (almost?) identical to calling func1d, which is great. Maybe you could add this to your solution. – Basileios Jul 16 '19 at 12:48
  • Am I understanding it correctly that when i swap `numpy.apply_along_axis` with `parallel_apply_along_axis` defined in this answer, it should "just work"? I am getting `RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase.` on the line `pool = multiprocessing.Pool()`... – Brambor Feb 17 '21 at 20:45
  • Yes, it's supposed to be a drop-in replacement, if I remember correctly. Would the question https://stackoverflow.com/questions/18204782/runtimeerror-on-windows-trying-python-multiprocessing apply to the error you see? – Eric O. Lebigot Feb 18 '21 at 13:50
  • I'm trying to apply this logic to interpolating a 3D masked array along the 0 axis. However, I can't get your solution working. What is the meaning of switching the axis? After switching them, the array_split function does not behave properly (the first returned sub_array is the full array, the other sub_arrays are empty). – WaterFox Mar 27 '22 at 15:33
  • Swapping axes simply puts the axis along which to apply the splitting to axis 0, as this is the axis on which the array splitting function works. What is the shape of your input array? – Eric O. Lebigot Mar 29 '22 at 13:02
  • In python 3+ might want to add the following to the unpacking method: ``return np.apply_along_axis(func1d, axis, arr, *args, **kwargs)`` – Quetzalcoatl Feb 08 '23 at 00:03
  • Re. the `unpacking_apply_along_axis` function: I've used `functools.partial` with Pool.map before to get around the one-argument problem. Is there some reason why functools.partial is not preferred in this case? – sh37211 Mar 29 '23 at 20:52