I am trying to use multiprocessing.Pool
to asynchronously dispatch some jobs to external processes, e.g.:
#!/bin/env python3
'''
test.py
'''
import multiprocessing.util
from multiprocessing import Pool
import shlex
import subprocess
from subprocess import PIPE
multiprocessing.util.log_to_stderr(multiprocessing.util.DEBUG)
def waiter(arg):
cmd = "sleep 360"
cmd_arg = shlex.split(cmd)
p = subprocess.Popen(cmd_arg, stdout=PIPE, stderr=PIPE)
so, se = p.communicate()
print (f"{so}\n{se}")
return arg
def main1():
proc_pool = Pool(4)
it = proc_pool.imap_unordered(waiter, range(0, 4))
for r in it:
print (r)
if __name__ == '__main__':
main1()
I would like it to terminate all called subprocesses, pool workers, and itself on SIGINT
. Currently this works fine with a pool size of 4
:
$> ./test.py
[DEBUG/MainProcess] created semlock with handle 140194873397248
[DEBUG/MainProcess] created semlock with handle 140194873393152
[DEBUG/MainProcess] created semlock with handle 140194873389056
[DEBUG/MainProcess] created semlock with handle 140194873384960
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[INFO/ForkPoolWorker-2] process shutting down
[DEBUG/ForkPoolWorker-2] running all "atexit" [DEBUG/ForkPoolWorker-3] runni[DEBUG/ForkPoolWorker-2] running the remaining[DEBUG/ForkPoolWorker-3] running the remaining "atexit" finalizers
Process ForkPoolWorker-1:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "./test.py", line 14, in waiter
so, se = p.communicate()
File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
stdout, stderr = self._communicate(input, en[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] joining worker handler
[DEBUG/MainProcess] result handler found thread._state=TERMINATE
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=1, thread._state=2
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
[DEBUG/MainProcess] running the remaining "atexit" finalizers
= self._poll.poll(timeout)
KeyboardInterrupt
[INFO/ForkPoolWorker-2] process exiting with exitcode 1
Process ForkPoolWorker-3:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "./test.py", line 14, in waiter
so, se = p.communicate()
File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
stdout, stderr = self._communicate(input, endtime, timeout)
File "/usr/local/lib/python3.6/subprocess.py", line 1496, in _communicate
ready = selector.select(timeout)
File "/usr/local/lib/python3.6/selectors.py", line 376, in select
fd_event_list = self._poll.poll(timeout)
KeyboardInterrupt
[INFO/ForkPoolWorker-3] process exiting with exitcode 1
$>
But it starts to fail intermittently as pool size is increased, e.g.:
$> ./test.py
[DEBUG/MainProcess] created semlock with handle 140143972425728
[DEBUG/MainProcess] created semlock with handle 140143972421632
[DEBUG/MainProcess] created semlock with handle 140143972417536
[DEBUG/MainProcess] created semlock with handle 140143972413440
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-5] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-8] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-9] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-11] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-12] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-13] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-14] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-15] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-16] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[INFO/ForkPoolWorker-12] process shutting down
[DEBUG/ForkPoolWorker-12] running all "atexit"[DEBUG/ForkPoolWorker-8] runnin[DEBUG/ForkPoolWorker-12] running the remaini[DEBUG/ForkPoolWorker-8] running the remaining "atexit" finalizers
Process ForkPoolWorker-4:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "./test.py", line 14, in waiter
so, se = p.communicate()
File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
stdout, stderr = self._communicate(input, endProcess ForkPoolWorker-9:
Traceback (most recent call last)[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] task handler exiting
(... Hangs here until I hit Ctrl-C again ...)
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/util.py", line 254, in _run_finalizers
finalizer()
File "/usr/local/lib/python3.6/multiprocessing/util.py", line 186, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 534, in _terminate_pool
cls._help_stuff_finish(inqueue, task_handler, len(pool))
File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 519, in _help_stuff_finish
inqueue._rlock.acquire()
KeyboardInterrupt
$>
Most paths lead here, but the bug that this workaround refers to appears to have been nominally fixed in 3.3+, which is why it works even some of the time; and in either case, I tried that solution and it did not solve the problem. Does anyone have suggestions for fixing or further debugging this issue?
Environment: Python 3.6.1 on a SuSe box with 32 cores.