0

I am trying to streamline a program that involves a set of short tasks that can be done in parallel, where the results of the set of tasks must be compared before moving onto the next step (which again involves a set of short tasks, and then another set, etc.). Due to the level of complexity of these tasks, it's not worthwhile to use multiprocessing due to the set-up time. I am wondering if there is another way to do these short tasks in parallel that is faster than linear. The only question I can find on this site that describes this problem for Python references this answer on memory sharing which I don't think answers my question (or if it does I could not follow how).

To illustrate what I am hoping to do, consider the problem of summing a bunch of numbers from 0 to N. (Of course this can be solved analytically, my point is to come up with a low-memory but short CPU-intensive task.) First, the linear approach would simply be:

def numbers(a,b):
    return(i for i in range(a,b))

def linear_sum(a):
    return(sum(numbers(a[0],a[1])))

n = 2000
linear_sum([0, n+1])
#2001000

For threading, I want to break the problem into parts that can then be summed separately and then combined, so the idea would be to get a bunch of ranges over which to sum with something like

def get_ranges(i, Nprocess = 3):
    di = i // Nprocess
    j = np.append(np.arange(0, i, di), [i+1,])
    return([(j[k], j[k+1]) for k in range(len(j)-1)])

and for some value n >> NProcesses the pseudocode example would be something like

values = get_ranges(n)
x = []
for value in values:
   x.append(do_someting_parallel(value))
return(sum(x))

The question then, is how to implement do_someting_parallel? For multiprocessing, we can do something like:

from multiprocessing import Pool as ThreadPool

def mpc_thread_sum(i, Nprocess = 3):
    values = get_ranges(i)
    pool = ThreadPool(Nprocess)
    results = pool.map(linear_sum, values)
    pool.close()
    pool.join()
    return(sum(results))

print(mpc_thread_sum(2000))
# 2001000

The graph below shows the performance of the different approaches described. Is there a way to speed up computations for the region where multiprocessing is still slower than linear or is this the limit of parallelization in Python's GIL? I suspect the answer may be that I am hitting my limit but wanted to ask here to be sure. I tried multiprocessing.dummy, asyncio, threading, and ThreadPoolExecutor (from concurrent.futures). For brevity, I have omitted the code, but all show comparable execution time to the linear approach. All are designed for I/O tasks, so are confined by GIL.

comparison of different approaches

ramzeek
  • 2,226
  • 12
  • 23

1 Answers1

1

My first observation is that the running time of function numbers can be cut roughly in half by simply defining it as:

def numbers(a, b):
    return range(a, b)

Second, a task that is 100% CPU-intensive as is computing the sum of numbers can never perform significantly better using pure Python without the aid of a C-language runtime library (such as numpy) because of contention for the Global Interpreter Lock (GIL), which prevents any sort of parallelization from occurring (and asyncio only uses a single thread to being with).

Third, the only way you can achieve a performance improvement running pure Python code against a 100% CPU task is with multiprocessing. But there is CPU overhead in creating the process pool and CPU overhead in passing arguments from the main process to the the address space in which the process pool's processes are running in and overhead again in passing back the results. So for there to be any performance improvement, the worker function linear_sum, cannot be trivial; it must require enough CPU processing to warrant the additional overhead I just mentioned.

The following benchmark runs the worker function, renamed to compute_sum and which now accepts as its argument a range. To further reduce overhead, I have introduced a function split that will take the passed range argument and generate multiple range instances removing the need to use numpy and to generate arrays. The benchmark computes the sum using a single thread (linear), a multithreading pool and a multiprocessing pool and is run twice for n = 2000 and n = 50_000_000. The benchmark displays the elapsed time and total CPU time across all processes.

For n = 2000, multiprocessing, as expected, performs worse than both linear and multithreading. For n = 50_000_000, multiprocessing's total CPU time is a bit higher than for linear and multithreading as is expected due to the additional aforementioned overhead. But now the elapsed time has gone down significantly. For both values of n, multithreading is a loser.

from multiprocessing.pool import Pool, ThreadPool

import time

def split(iterable, n):
    k, m = divmod(len(iterable), n)
    return (iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

def compute_sum(r):
    t = time.process_time()
    return (sum(r), time.process_time() - t)

if __name__ == '__main__':
    for n in (2000, 50_000_000):
        r = range(0, n+1)

        t1 = time.time()
        s, cpu = compute_sum(r)
        elapsed = time.time() - t1
        print(f'n = {n}, linear elapsed time = {elapsed}, total     cpu time = {cpu}, sum = {s}')

        t1 = time.time()
        t2 = time.process_time()
        thread_pool = ThreadPool(4)
        s = 0
        for return_value, process_time in thread_pool.imap_unordered(compute_sum, split(r, 4)):
            s += return_value
        elapsed = time.time() - t1
        cpu = time.process_time() - t2
        print(f'n = {n}, thread pool elapsed time = {elapsed}, total cpu time = {cpu}, sum = {s}')
        thread_pool.close()
        thread_pool.join()

        t1 = time.time()
        t2 = time.process_time()
        pool = Pool(4)
        s = 0
        cpu = 0
        for return_value, process_time in pool.imap_unordered(compute_sum, split(r, 4)):
            s += return_value
            cpu += process_time
        elapsed = time.time() - t1
        cpu += time.process_time() - t2
        print(f'n = {n}, multiprocessing elapsed time = {elapsed}, total cpu time = {cpu}, sum = {s}')
        pool.close()
        pool.join()
        print()

Prints:

n = 2000, linear elapsed time = 0.0, total cpu time = 0.0, sum = 2001000
n = 2000, thread pool elapsed time = 0.00700068473815918, total cpu time = 0.015625, sum = 2001000
n = 2000, multiprocessing elapsed time = 0.13200139999389648, total cpu time = 0.015625, sum = 2001000

n = 50000000, linear elapsed time = 2.0311124324798584, total cpu time = 2.03125, sum = 1250000025000000
n = 50000000, thread pool elapsed time = 2.050999164581299, total cpu time = 2.046875, sum = 1250000025000000
n = 50000000, multiprocessing elapsed time = 0.7579991817474365, total cpu time = 2.359375, sum = 125000002500000
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Thank you. I am going to take some time to read through your code as I am just learning about the `multiprocessing` module. My goal was not to make my code more efficient (though I will still absorb your improvements!) but rather to know if there is any way to improve computational time faster than linear when on the left side of my graph before `multiprocessing` outperforms linear. It sounds like you are basically confirming that it cannot be done. Still, this is a good lesson for determining when to and when not to use `multiprocessing`. – ramzeek Nov 25 '21 at 14:23
  • That is correct, it cannot be done with multithreading unless you are using a C-language library or the `numba` package from the `PyPI` repository, which attempts to compile your code on the fly ( a just-in-time compiler). And if you are trying to improve computational time, I would say that is the definition of making your code more efficient. And regarding the one-line change I suggested for `numbers`, those are the types of optimizations you should *first* be looking for. – Booboo Nov 25 '21 at 14:31
  • The more efficient we make our computation (the less CPU required by `compute_sum`), the greater the value `n` has to be to make multiprocessing worthwhile. When you have a mixture of I/O (or network access such as fetching URLs), which releases the GIL, and CPU processing where the I/O time greatly dominates, then multithreading or asyncio is *usually* the right approach especially if you have, for example, a 300 URL's to retrieve and process. Because creating 300 threads is more efficient than creating 300 processes and you can share a `requests.Session` object across the threads. (more...) – Booboo Nov 25 '21 at 14:44
  • But if the CPU processing portion is not so insignificant, the optimum solution might be to create a thread pool of 300 threads *and* a multiprocessing pool , `multiprocessing_pool`, with the default size (number of CPU cores you have) and pass `multiprocessing_pool` to the thread pool worker function, which performs the CPU-intensive processing by having that in a separate function, e.g. `process_reply`, and then calling, for example, `result = multiprocessing_pool.apply(process_reply, args=(data,))`. – Booboo Nov 25 '21 at 14:50
  • Thank you for the follow-up! Historically most of what I have needed to do have been jobs that take seconds but right now I'm working on a project where the computations can take hours, so I'm starting to pay attention to how my programming achieves its goal, not just achieving it. – ramzeek Nov 25 '21 at 14:55