0

I am trying to understand how the multiprocessing.Pool works, and I have developed a minimal example that illustrates my question. Briefly, I am using pool.map to parallelize a CPU-bound function operating on an array by following the example Dead simple example of using Multiprocessing Queue, Pool and Locking. When I follow that pattern, I get only a modest speedup with 4 cores, but if I instead manually chunk the array into num_threads and then use pool.map over the chunks, I find speedup factors that vastly exceed 4x, which makes no sense to me. Details to follow.

First, the function definitions.

def take_up_time():
    n = 1e3
    while n > 0:
        n -= 1

def count_even_numbers(x):
    take_up_time()
    return np.where(np.mod(x, 2) == 0, 1, 0)

Now define the functions we'll benchmark.

First the function that runs in serial:

def serial(arr):
    return np.sum(map(count_even_numbers,arr))

Now the function that uses Pool.map in the "standard" way:

def parallelization_strategy1(arr):
    num_threads = multiprocessing_count()
    pool = multiprocessing.Pool(num_threads)
    result = pool.map(count_even_numbers,arr)
    pool.close()
    return np.sum(result)

Finally, the second strategy in which I manually chunk the array and then run Pool.map over the chunks (Splitting solution due to python numpy split array into unequal subarrays)

def split_padded(a,n):
    """ Simple helper function for strategy 2
    """
    padding = (-len(a))%n
    if padding == 0:
        return np.split(a, n)
    else:
        sub_arrays = np.split(np.concatenate((a,np.zeros(padding))),n)
        sub_arrays[-1] = sub_arrays[-1][:-padding]
    return sub_arrays

def parallelization_strategy2(arr):
    num_threads = multiprocessing_count()
    sub_arrays = split_padded(arr, num_threads)
    pool = multiprocessing.Pool(num_threads)
    result = pool.map(count_even_numbers,sub_arrays)
    pool.close()
    return np.sum(np.array(result))

Here is my array input:

npts = 1e3
arr = np.arange(npts)

Now I use the IPython %timeit function to run my timings, and for 1e3 points I get the following:

  • serial: 10 loops, best of 3: 98.7 ms per loop
  • parallelization_strategy1: 10 loops, best of 3: 77.7 ms per loop
  • parallelization_strategy2: 10 loops, best of 3: 22 ms per loop

Since I have 4 cores, Strategy 1 is a disappointingly modest speedup, and strategy 2 is suspiciously larger than the maximum 4x speedup.

When I increase npts to 1e4, the results are even more perplexing:

  • serial: 1 loops, best of 3: 967 ms per loop
  • parallelization_strategy1: 1 loops, best of 3: 596 ms per loop
  • parallelization_strategy2: 10 loops, best of 3: 22.9 ms per loop

So the two sources of confusion are:

  1. Strategy 2 is way faster than the naive theoretical limit
  2. For some reason, %timeit with npts=1e4 only triggers 1 loop for serial and strategy 1, but 10 loops for strategy 2.
Community
  • 1
  • 1
aph
  • 1,765
  • 2
  • 19
  • 34

2 Answers2

2

Turns out your example fits perfectly in the Pythran model. Compiling the following source code count_even.py:

#pythran export count_even(int [:])
import numpy as np

def count_even_numbers(x):
    return np.where(np.mod(x, 2) == 0, 1, 0)

def count_even(arr):
    s = 0
    #omp parallel for reduction(+:s)
    for elem in arr:
        s += count_even_numbers(elem)
    return s

with the command line (-fopenmp activates the handling of the OpenMP annotations):

pythran count_even.py -fopenmp

And running timeit over this already yields massive speedups thanks to the conversion to native code:

Without Pythran

$ python -m timeit -s 'import numpy as np; arr = np.arange(1e7, dtype=int); from count_even import count_even' 'count_even(arr)'
verryyy long, more than several minutes :-/

With Pythran, one core

$ OMP_NUM_THREADS=1 python -m timeit -s 'import numpy as np; arr = np.arange(1e7, dtype=int); from count_even import count_even' 'count_even(arr)'
100 loops, best of 3: 10.3 msec per loop

With Pythran, two cores:

$ OMP_NUM_THREADS=2 python -m timeit -s 'import numpy as np; arr = np.arange(1e7, dtype=int); from count_even import count_even' 'count_even(arr)'
100 loops, best of 3: 5.5 msec per loop

twice as fast, parallelization is working :-)

Note that OpenMP enables multi-threading, not multi-processing.

Samuel Lelièvre
  • 3,212
  • 1
  • 14
  • 27
serge-sans-paille
  • 2,109
  • 11
  • 9
1

Your strategies aren't doing the same!

In the first strategy, the Pool.map iterates over an array, so count_even_numbers is called for every array item (since the shape of the array is one-dimensional).

The second strategy maps over a list of arrays, so count_even_numbers is called for every array in the list.

Roland Smith
  • 42,427
  • 3
  • 64
  • 94
  • Good point, Roland, that resolves the mystery and seems obvious in hindsight. Thanks! – aph Mar 16 '16 at 22:34