I've a script that has 2 parts : One that is I/O heavy (api calls), and another part that is CPU heavy (processing API output)
I would like to do the API calls with multithreading so they run concurrently, and as the results come in, they are processed by the Multiprocessing.
My code looks like this :
with ThreadPoolExecutor() as executor:
for start, end, total, qs in batch_generator:
logger.info(f"Processing players {start + 1} to {end} of {total}")
futures.append(
executor.submit(self.my_api_calls_func, qs)
)
with ProcessPoolExecutor() as p_executor:
for future in as_completed(futures):
print("completed thread task")
p_executor.submit(self.my_CPU_heavy_task, *future.result()))
print("player batch submitted to process pool")
However I noticed that the results aren't handled concurrently, my logs go like so :
Processing players 1 to 500 of 5000
Processing players 501 to 1000 of 5000
Processing players 1001 to 1500 of 5000
Processing players 1501 to 2000 of 5000
...
Processing players 4501 to 5000 of 5000
completed thread task
player batch submitted to process pool
completed thread task
player batch submitted to process pool
completed thread task
player batch submitted to process pool
...
The p_executor submit never runs, it just holds.
SO when I interrupt with CTRL + C, the traceback shows me this :
^CProcess ForkProcess-4:
Process ForkProcess-1:
Process ForkProcess-8:
Process ForkProcess-7:
Process ForkProcess-5:
Process ForkProcess-2:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
Traceback (most recent call last):
KeyboardInterrupt
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/lib/python3.8/multiprocessing/queues.py", line 97, in get
res = self._recv_bytes()
Traceback (most recent call last):
Process ForkProcess-6:
Traceback (most recent call last):
File "manage.py", line 22, in <module>
main()
File "manage.py", line 18, in main
execute_from_command_line(sys.argv)
File "/home/benja/Documents/GitHub/P13_BS_API/venv/lib/python3.8/site-packages/django/core/management/__init__.py", line 446, in execute_from_command_line
utility.execute()
File "/home/benja/Documents/GitHub/P13_BS_API/venv/lib/python3.8/site-packages/django/core/management/__init__.py", line 440, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "/home/benja/Documents/GitHub/P13_BS_API/venv/lib/python3.8/site-packages/django/core/management/base.py", line 414, in run_from_argv
self.execute(*args, **cmd_options)
Process ForkProcess-3:
File "/home/benja/Documents/GitHub/P13_BS_API/venv/lib/python3.8/site-packages/django/core/management/base.py", line 460, in execute
output = self.handle(*args, **options)
File "/home/benja/Documents/GitHub/P13_BS_API/BrawlClub/player_lookup/management/commands/update_all_players.py", line 130, in handle
p_executor.submit(self.update_player_batch, *future.result())
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 644, in __exit__
self.shutdown(wait=True)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 686, in shutdown
self._queue_management_thread.join()
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
File "/usr/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
buf = self._recv(4)
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/threading.py", line 1011, in join
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/usr/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
self._wait_for_tstate_lock()
File "/usr/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
File "/usr/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
What am I missing ?
Thanks