1

I'm working on a library function that uses concurrent.futures to spread network I/O across multiple threads. Due to the Python GIL I'm experiencing a slowdown on some workloads (large files), so I want to switch to multiple processes. However, multiple processes will also be less than ideal for some other workloads (many small files). I'd like to split the difference and have multiple processes, each with their own thread pool.

The problem is job queueing - concurrent.futures doesn't seem to be set up to queue jobs properly for multiple processes that each can handle multiple jobs at once. While breaking up the job list into chunks ahead of time is an option, it would work much more smoothly if jobs flowed to each process asynchronously as their individual threads completed a task.

How can I efficiently queue jobs across multiple processes and threads using this or a similar API? Aside from writing my own executor, is there any obvious solution I'm overlooking? Or is there any prior art for a mixed process/thread executor?

Andrew Gorcester
  • 19,595
  • 7
  • 57
  • 73
  • I think you create a single queue (using a `multiprocessing.Manager`) for feeding jobs to your process workers. You create a `multiprocessing.Pool` with N processes, and in each process you pull items from the queue and submit them to a `concurrent futures.ThreadPoolExecutor` with M max workers. – larsks Jan 27 '23 at 22:36
  • @larsks A `multiprocessing.Queue` instance would be much more efficient. Each pool process can be initialized with the queue instance using the *initializer* and *initargs* arguments to the `ProcessPoolExecutor` initializer. – Booboo Jan 29 '23 at 15:25

1 Answers1

2

If I understand what you are trying to do you basically have a lot of jobs that are suitable for multithreading except that there is some CPU-intensive work. So your idea is to create multiple threading pools in multiple child processes so that there is less GIL contention. Of course, in a any given child process the CPU-intensive code will only be executed serially (assuming it is Python byte code), so it's not a perfect solution.

One approach is to just create a very large multiprocessing pool (larger than the number of cores you have). There is a limit to how may processes you can create and their creation is expensive. But since most of the time they will be waiting for I/O to complete the I/O portion will execute concurrently.

A better approach would be to create a multiprocessing pool whose executor can be passed to a multithreading pool worker function along with the other required arguments. This is an inversion of what you were planning to do. When the worker function has a CPU-intensive work to perform, it can submit that work to the passed multiprocessing pool executor and block for the returned result. In that way you get the optimal parallelism you can achieve given the number of cores you have. This would be my recommendation.. For example:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed

def cpu_intensive(x):
    return x ** 2

def thread_worker(process_executor, x):
    import time

    # Do something with x
    ...
    time.sleep(.1) # simulate time taken
    future = process_executor.submit(cpu_intensive, x)
    squared = future.result() # Just for demo purposes
    return x, squared

if __name__ == '__main__':
    input_args = (100, 200, 300, 400, 500)
    with ProcessPoolExecutor() as process_executor:
        with ThreadPoolExecutor(10) as thread_executor:
            # Each input results in multiple threading jobs being created:
            futures = [
                thread_executor.submit(thread_worker, process_executor, input_arg + i)
                    for input_arg in input_args
                        for i in range(5)
            ]
            results = [future.result() for future in as_completed(futures)]
    print(results)

Prints:

[(204, 41616), (202, 40804), (203, 41209), (200, 40000), (201, 40401), (104, 10816), (103, 10609), (102, 10404), (101, 10201), (100, 10000), (402, 161604), (303, 91809), (302, 91204), (301, 90601), (400, 160000), (300, 90000), (304, 92416), (403, 162409), (401, 160801), (404, 163216), (500, 250000), (501, 251001), (504, 254016), (503, 253009), (502, 252004)]

But if you wanted to go along with your original idea or for some reason the above framework does not fit your actual situation, perhaps something like the following might work:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from multiprocessing import Queue
from queue import Empty

def init_pool_processes(q):
    global queue, thread_pool_executor

    queue = q
    thread_pool_executor = ThreadPoolExecutor(10) # or some appropriate pool size


def thread_worker(x):
    import time

    # Do something with x
    ...
    time.sleep(.1) # simulate time taken
    return x # Just for demo purposes

def process_worker(y):
    # This results in some number of threadpool jobs:
    futures = [thread_pool_executor.submit(thread_worker, y + i) for i in range(5)]
    for future in as_completed(futures):
        queue.put(future.result())


if __name__ == '__main__':
    results = []

    def get_results(result):
        try:
            while True:
                result = queue.get_no_wait()
                results.append(result)
        except Empty:
            pass

    input_args = (100, 200, 300, 400, 500)
    queue = Queue()
    with ProcessPoolExecutor(initializer=init_pool_processes, initargs=(queue,)) as executor:
        futures = [executor.submit(process_worker, input_arg) for input_arg in input_args]
        for future in as_completed(futures):
            # Every time a job submitted to the process pool completes we can
            # look for more results:
            try:
                while True:
                    result = queue.get_nowait()
                    results.append(result)
            except Empty:
                pass
    print(results)

Prints:

[102, 201, 101, 203, 103, 202, 200, 100, 104, 204, 504, 301, 404, 502, 304, 403, 302, 501, 503, 500, 402, 303, 401, 300, 400]
Booboo
  • 38,656
  • 3
  • 37
  • 60