2

I've been reading threads like this one but any of them seems to work for my case. I'm trying to parallelize the following toy example to fill a Numpy array inside a for loop using Multiprocessing in Python:

import numpy as np
from multiprocessing import Pool
import time


def func1(x, y=1):
    return x**2 + y

def func2(n, parallel=False):
    my_array = np.zeros((n))
    
    # Parallelized version:
    if parallel:
        pool = Pool(processes=6)
        for idx, val in enumerate(range(1, n+1)):
            result = pool.apply_async(func1, [val])
            my_array[idx] = result.get()
        pool.close()

    # Not parallelized version:
    else:
        for i in range(1, n+1):
            my_array[i-1] = func1(i)

    return my_array

def main():
    start = time.time()
    my_array = func2(60000)
    end = time.time()
    
    print(my_array)
    print("Normal time: {}\n".format(end-start))

    start_parallel = time.time()
    my_array_parallelized = func2(60000, parallel=True)
    end_parallel = time.time()

    print(my_array_parallelized)
    print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))


if __name__ == '__main__':
    main()

The lines in the code based on Multiprocessing seem to work and give you the right results. However, it takes far longer than the non parallelized version. Here is the output of both versions:

[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
 3.60000e+09]
Normal time: 0.01605963706970215

[2.00000e+00 5.00000e+00 1.00000e+01 ... 3.59976e+09 3.59988e+09
 3.60000e+09]
Time based on multiprocessing: 2.8775112628936768

My intuition tells me that it should be a better way of capturing results from pool.apply_async(). What am I doing wrong? What is the most efficient way to accomplish this? Thx.

Antonio Serrano
  • 882
  • 2
  • 14
  • 27
  • Why do you expect this to be improved with multiprocessing? Is this a toy example? Because you should be using a vectorized operation for this, not a interpreter-level loop (let alone multiprocessing) – juanpa.arrivillaga Apr 22 '21 at 18:28
  • "My intuition tells me that it should be a better way of capturing results from pool.apply_async()." can you elaborate on what you mean by this? – juanpa.arrivillaga Apr 22 '21 at 18:29
  • In my real program (not in this toy example), I use a function like "func2" that calls an external program to generate some images inside a for loop. I want that calls to run in parallel. So, I can't use a vectorized implementation. What I'm trying to achieve with this toy example is something similar to [this tutorial](https://www.machinelearningplus.com/python/parallel-processing-python/) but using numpy arrays as an example (instead of lists). – Antonio Serrano Apr 22 '21 at 18:42

2 Answers2

5

Creating processes is expensive. On my machine it take at leas several hundred of microsecond per process created. Moreover, the multiprocessing module copy the data to be computed between process and then gather the results from the process pool. This inter-process communication is very slow too. The problem is that your computation is trivial and can be done very quickly, likely much faster than all the introduced overhead. The multiprocessing module is only useful when you are dealing with quite small datasets and perform intensive computation (compared to the amount of computed data).

Hopefully, when it comes to numericals computations using Numpy, there is a simple and fast way to parallelize your application: the Numba JIT. Numba can parallelize a code if you explicitly use parallel structures (parallel=True and prange). It uses threads and not heavy processes that are working in shared memory. Numba can overcome the GIL if your code does not deal with native types and Numpy arrays instead of pure Python dynamic object (lists, big integers, classes, etc.). Here is an example:

import numpy as np
import numba as nb
import time

@nb.njit
def func1(x, y=1):
    return x**2 + y

@nb.njit('float64[:](int64)', parallel=True)
def func2(n):
    my_array = np.zeros(n)
    for i in nb.prange(1, n+1):
        my_array[i-1] = func1(i)
    return my_array

def main():
    start = time.time()
    my_array = func2(60000)
    end = time.time()
    
    print(my_array)
    print("Numba time: {}\n".format(end-start))

if __name__ == '__main__':
    main()

Because Numba compiles the code at runtime, it is able to fully optimize the loop to a no-op resulting in a time close to 0 second in this case.

Jérôme Richard
  • 41,678
  • 6
  • 29
  • 59
  • This is probably a much better way to do it, but to be fair: I guess the original code would work if they wouldn't call `result.get()` within the `for` loop, right? – thisisalsomypassword Apr 22 '21 at 19:50
  • That's really cool. Thank you! BTW, I couldn't use numba/njit if func2 had an object type as one of its arguments, right? – Antonio Serrano Apr 22 '21 at 19:57
  • @thisisalsomypassword, if you didn't call `result.get()` inside the loop, how would you fill the array with each result? – Antonio Serrano Apr 22 '21 at 20:04
  • 1
    By collecting the `AsyncResult` objects in a list within the loop and then calling `AsyncResult.get()` after all processes have started on each result object. – thisisalsomypassword Apr 22 '21 at 20:14
  • 1
    @AntonioSerrano Well you can use Numba with the GIL in this case, but it will not be very fast and parallelization only be possible on parts of the code that does not work on pure Python types. If you are dealing with a lot of Python types, then you can use Cython. While Cython provide a `prange` parallel built-in, parallelism is not possible on CPython types because of the GIL and sadly no package can actually overcome this (this is a fundamental issue of CPython). You could try to play with PyPy otherwise but AFAIK the parallel version is quite inefficient so far. – Jérôme Richard Apr 23 '21 at 16:26
0

Here is the solution proposed by @thisisalsomypassword that improves my initial proposal. That is, "collecting the AsyncResult objects in a list within the loop and then calling AsyncResult.get() after all processes have started on each result object":

import numpy as np
from multiprocessing import Pool
import time


def func1(x, y=1):
    time.sleep(0.1)
    return x**2 + y

def func2(n, parallel=False):
    my_array = np.zeros((n))
    
    # Parallelized version:
    if parallel:
        pool = Pool(processes=6)
        ####### HERE COMES THE CHANGE #######
        results = [pool.apply_async(func1, [val]) for val in range(1, n+1)]
        for idx, val in enumerate(results):
            my_array[idx] = val.get()
        #######
        pool.close()

    # Not parallelized version:
    else:
        for i in range(1, n+1):
            my_array[i-1] = func1(i)

    return my_array

def main():
    start = time.time()
    my_array = func2(600)
    end = time.time()
    
    print(my_array)
    print("Normal time: {}\n".format(end-start))

    start_parallel = time.time()
    my_array_parallelized = func2(600, parallel=True)
    end_parallel = time.time()

    print(my_array_parallelized)
    print("Time based on multiprocessing: {}".format(end_parallel-start_parallel))


if __name__ == '__main__':
    main()

Now it works. Time is reduced considerably with Multiprocessing:

Normal time: 60.107836008071
Time based on multiprocessing: 10.049324989318848    

time.sleep(0.1) was added in func1 to cancel out the effect of being a super trivial task.

Antonio Serrano
  • 882
  • 2
  • 14
  • 27