1

I've a script that has 2 parts : One that is I/O heavy (api calls), and another part that is CPU heavy (processing API output)

I would like to do the API calls with multithreading so they run concurrently, and as the results come in, they are processed by the Multiprocessing.

My code looks like this :

with ThreadPoolExecutor() as executor:
    for start, end, total, qs in batch_generator:
        logger.info(f"Processing players {start + 1} to {end} of {total}")
        futures.append(
            executor.submit(self.my_api_calls_func, qs)
        )

    with ProcessPoolExecutor() as p_executor:
        for future in as_completed(futures):
            print("completed thread task")
            p_executor.submit(self.my_CPU_heavy_task, *future.result()))
            print("player batch submitted to process pool")

However I noticed that the results aren't handled concurrently, my logs go like so :

Processing players 1 to 500 of 5000
Processing players 501 to 1000 of 5000
Processing players 1001 to 1500 of 5000
Processing players 1501 to 2000 of 5000
...
Processing players 4501 to 5000 of 5000
completed thread task
player batch submitted to process pool
completed thread task
player batch submitted to process pool
completed thread task
player batch submitted to process pool
...

The p_executor submit never runs, it just holds.

SO when I interrupt with CTRL + C, the traceback shows me this :

^CProcess ForkProcess-4:
Process ForkProcess-1:
Process ForkProcess-8:
Process ForkProcess-7:
Process ForkProcess-5:
Process ForkProcess-2:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
  File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
Traceback (most recent call last):
KeyboardInterrupt
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
  File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 97, in get
    res = self._recv_bytes()
Traceback (most recent call last):
Process ForkProcess-6:
Traceback (most recent call last):
  File "manage.py", line 22, in <module>
    main()
  File "manage.py", line 18, in main
    execute_from_command_line(sys.argv)
  File "/home/benja/Documents/GitHub/P13_BS_API/venv/lib/python3.8/site-packages/django/core/management/__init__.py", line 446, in execute_from_command_line
    utility.execute()
  File "/home/benja/Documents/GitHub/P13_BS_API/venv/lib/python3.8/site-packages/django/core/management/__init__.py", line 440, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/home/benja/Documents/GitHub/P13_BS_API/venv/lib/python3.8/site-packages/django/core/management/base.py", line 414, in run_from_argv
    self.execute(*args, **cmd_options)
Process ForkProcess-3:
  File "/home/benja/Documents/GitHub/P13_BS_API/venv/lib/python3.8/site-packages/django/core/management/base.py", line 460, in execute
    output = self.handle(*args, **options)
  File "/home/benja/Documents/GitHub/P13_BS_API/BrawlClub/player_lookup/management/commands/update_all_players.py", line 130, in handle
    p_executor.submit(self.update_player_batch, *future.result())
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 644, in __exit__
    self.shutdown(wait=True)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 686, in shutdown
    self._queue_management_thread.join()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
  File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
  File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
  File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/threading.py", line 1011, in join
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
  File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt

What am I missing ?

Thanks

  • 1
    You call "my_CPU_heavy_task" in the main process. You must give the uncalled method object as argument to "submit" and its arguments separately (as you did for the ThreadPoolExecutor). – Michael Butscher Nov 10 '22 at 19:24
  • You're right, that's just me being blind, thanks ! Will try and report ! – Benjamin_Mourgues Nov 10 '22 at 19:25
  • Minor improvement to clean up thread workers a little faster (if needed, and you're not using them after that initial `submit` loop): Add `executor.shutdown(wait=False)` just before the `with ProcessPoolExecutor() as p_executor:` line; as the submitted tasks queue empties, the worker threads will get cleaned up immediately (rather than all sticking around, waiting for more work that will never come, consuming resources for no reason). – ShadowRanger Nov 10 '22 at 19:37
  • @ShadowRanger I appreciate ! Wait=False did kill some queries before they ended though – Benjamin_Mourgues Nov 10 '22 at 19:51
  • @MichaelButscher I've updated the question, now it seems that the CPU_heavy_func actually never runs and the program holds for some reason – Benjamin_Mourgues Nov 10 '22 at 19:52
  • 3
    It's worth noting that mixing multiprocessing and multithreading is generally not safe - fork() only copies the active thread, and it is possible for a another thread to be holding a lock when the process is copied. If that happens, the lock will never be released in the child process. You can get away with it if you set the start method to either spawn or forkserver. – Nick ODell Nov 10 '22 at 19:52
  • @Benjamin_Mourgues: It definitely shouldn't kill any outstanding tasks (there's a separate argument to cancel tasks that have been submitted but not dispatched, but it defaults to `False`). Running tasks *can't* be cancelled, and waiting tasks *aren't* cancelled. Only way I can see it causing problems is if you've got code you haven't shown (e.g. in the tasks submitted to the thread executor themselves) that add new tasks after the `shutdown` call (they'd be rejected, because the executor is shutting down). – ShadowRanger Nov 10 '22 at 19:57
  • @NickODell That's possibly what's up here, thanks for the heads up :/ – Benjamin_Mourgues Nov 10 '22 at 20:02
  • @ShadowRanger The my_api_calls_func uses httpx.AsyncClient to make queries, maybe that's a factor ? – Benjamin_Mourgues Nov 10 '22 at 20:03
  • 3
    @Benjamin_Mourgues: Perhaps? By the way, one more warning: When you pass `self.my_CPU_heavy_task` as the function to submit to a `ProcessPoolExecutor`, it's serializing the whole instance, along with a reference to the method itself. If `self` is lightweight (just a handful of small attributes) you're probably fine, but it can get expensive if it, directly or indirectly through a chain of attributes, references biggish data. [Read more here](https://stackoverflow.com/a/38135787/364696). – ShadowRanger Nov 10 '22 at 20:09
  • Just to emphasize, you are better off calling functions or static methods instead of class or instance methods in a process pool. Also, there is a question about what `self.my_api_calls_func` returns. Its objects may cause pickling problems too. – tdelaney Nov 10 '22 at 20:38
  • "I noticed that the results aren't handled concurrently" could you explain what you mean here? what do you think it would look like if it was being "handled concurrently"? – Sam Mason Nov 11 '22 at 12:06
  • @tdelaney The API calls function does 2 things : It uses asyncio to fetch data on an api, and make a tuple (dict, dict, dict) out of it. This is what is returned. The CPU heavy task then processes the tuple, and saves objects in a psql database. – Benjamin_Mourgues Nov 11 '22 at 14:18
  • @SamMason I've logs that measure time to run, and basically it did 12s processing -> Done -> 12s processing -> Done 12s is the time it takes for a single task to complete usually. – Benjamin_Mourgues Nov 11 '22 at 14:19
  • 1
    Have you run under something like https://github.com/plasma-umass/scalene to make sure time is being spent where you think it is? – Sam Mason Nov 11 '22 at 16:04
  • @SamMason Didn't know scalene, wonder how this will play with Django (which is where these function run), thanks for suggestion ! – Benjamin_Mourgues Nov 15 '22 at 13:50

1 Answers1

0

The problem arising from mixing multiprocessing and multithreading together on platforms that use fork that was mentioned by Nick O'Dell does not appear to be a problem when using the multiprocessing package (at least not on my Linux platform). You can, of course, force Python to use the spawn method when creating new processes. Creating a process using spawn is more expensive, but the processing pool's processes are only created once and you appear to be submitting a number of tasks to that pool that far exceed the size of the pool and therefore the extra overhead incurred by using spawn is amortized across all the tasks that will be submitted. Therefore, the increase in running time should be a very small percentage assuming that the CPU-intensive task is not completely trivial (if it were, you should not be using multiprocessing at all).

The following demonstrates the basic pattern:

from multiprocessing.pool import Pool, ThreadPool
from functools import partial

def cpu_heavy_processing(io_result):
    ...
    print(io_result, io_result ** 2) # for demo purposes

def io_heavy_processing(arg):
    ...
    # for demo just use the passed arg value + 1
    return arg + 1

with ThreadPool(10) as t_executor:
    p_executor = Pool()
    for io_result in t_executor.imap_unordered(io_heavy_processing, range(20)):
        p_executor.apply_async(cpu_heavy_processing, args=(io_result,))
    # Wait for all submittted tasks to complete:
    p_executor.close()
    p_executor.join()

If you care about getting returned results in the correct order:

from multiprocessing.pool import Pool, ThreadPool
from functools import partial

def cpu_heavy_processing(io_result):
    ...
    return io_result ** 2 # for demo purposes

def io_heavy_processing(arg):
    ...
    # for demo just use the passed arg value + 1
    return arg + 1

with ThreadPool(10) as t_executor:
    with Pool() as p_executor:
        async_results = [
            p_executor.apply_async(cpu_heavy_processing, args=(io_result,))
                for io_result in t_executor.imap(io_heavy_processing, range(20))
        ]
        results = [async_result.get() for async_result in async_results]
        print(results)

Prints:

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
Booboo
  • 38,656
  • 3
  • 37
  • 60