7

I am trying to use multiprocessing.Pool to asynchronously dispatch some jobs to external processes, e.g.:

#!/bin/env python3
'''
    test.py
'''
import multiprocessing.util
from multiprocessing import Pool
import shlex
import subprocess
from subprocess import PIPE

multiprocessing.util.log_to_stderr(multiprocessing.util.DEBUG)

def waiter(arg):
    cmd = "sleep 360"
    cmd_arg = shlex.split(cmd)
    p = subprocess.Popen(cmd_arg, stdout=PIPE, stderr=PIPE) 
    so, se = p.communicate()
    print (f"{so}\n{se}")
    return arg

def main1():
    proc_pool = Pool(4)
    it = proc_pool.imap_unordered(waiter, range(0, 4))
    for r in it:
        print (r)

if __name__ == '__main__':
    main1()

I would like it to terminate all called subprocesses, pool workers, and itself on SIGINT. Currently this works fine with a pool size of 4:

$> ./test.py
[DEBUG/MainProcess] created semlock with handle 140194873397248
[DEBUG/MainProcess] created semlock with handle 140194873393152
[DEBUG/MainProcess] created semlock with handle 140194873389056
[DEBUG/MainProcess] created semlock with handle 140194873384960
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[INFO/ForkPoolWorker-2] process shutting down
[DEBUG/ForkPoolWorker-2] running all "atexit" [DEBUG/ForkPoolWorker-3] runni[DEBUG/ForkPoolWorker-2] running the remaining[DEBUG/ForkPoolWorker-3] running the remaining "atexit" finalizers
Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "./test.py", line 14, in waiter
    so, se = p.communicate()
  File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
    stdout, stderr = self._communicate(input, en[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] joining worker handler
[DEBUG/MainProcess] result handler found thread._state=TERMINATE
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=1, thread._state=2
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
[DEBUG/MainProcess] running the remaining "atexit" finalizers
= self._poll.poll(timeout)
KeyboardInterrupt
[INFO/ForkPoolWorker-2] process exiting with exitcode 1
Process ForkPoolWorker-3:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "./test.py", line 14, in waiter
    so, se = p.communicate()
  File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
    stdout, stderr = self._communicate(input, endtime, timeout)
  File "/usr/local/lib/python3.6/subprocess.py", line 1496, in _communicate
    ready = selector.select(timeout)
  File "/usr/local/lib/python3.6/selectors.py", line 376, in select
    fd_event_list = self._poll.poll(timeout)
KeyboardInterrupt
[INFO/ForkPoolWorker-3] process exiting with exitcode 1
$>

But it starts to fail intermittently as pool size is increased, e.g.:

$> ./test.py
[DEBUG/MainProcess] created semlock with handle 140143972425728
[DEBUG/MainProcess] created semlock with handle 140143972421632
[DEBUG/MainProcess] created semlock with handle 140143972417536
[DEBUG/MainProcess] created semlock with handle 140143972413440
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-5] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-8] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-9] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-11] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-12] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-13] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-14] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-15] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-16] child process calling self.run()
[DEBUG/MainProcess] doing set_length()
[INFO/ForkPoolWorker-12] process shutting down
[DEBUG/ForkPoolWorker-12] running all "atexit"[DEBUG/ForkPoolWorker-8] runnin[DEBUG/ForkPoolWorker-12] running the remaini[DEBUG/ForkPoolWorker-8] running the remaining "atexit" finalizers
Process ForkPoolWorker-4:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "./test.py", line 14, in waiter
    so, se = p.communicate()
  File "/usr/local/lib/python3.6/subprocess.py", line 836, in communicate
    stdout, stderr = self._communicate(input, endProcess ForkPoolWorker-9:
Traceback (most recent call last)[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] task handler exiting
(... Hangs here until I hit Ctrl-C again ...)
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 534, in _terminate_pool
    cls._help_stuff_finish(inqueue, task_handler, len(pool))
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 519, in _help_stuff_finish
    inqueue._rlock.acquire()
KeyboardInterrupt
$>

Most paths lead here, but the bug that this workaround refers to appears to have been nominally fixed in 3.3+, which is why it works even some of the time; and in either case, I tried that solution and it did not solve the problem. Does anyone have suggestions for fixing or further debugging this issue?

Environment: Python 3.6.1 on a SuSe box with 32 cores.

ipetrik
  • 1,749
  • 18
  • 28

1 Answers1

0

So actually after much digging, I tried some other combinations of the suggestions in that post, and this seems to fix the issue. As far as I could figure out, if you allow SIGINT to pass to the child processes, as it does by default, and you have many more pool workers than jobs, the pool workers may get killed without releasing the lock the ._inqueue. Then _help_stuff_finish locks on trying to acquire a lock that was never released by the worker.

#!/bin/env python3.6
from multiprocessing import Pool
import shlex
import subprocess
from subprocess import PIPE
import signal

def waiter(arg):
    cmd = "sleep 360"
    cmd_arg = shlex.split(cmd)
    p = subprocess.Popen(cmd_arg, stdout=PIPE, stderr=PIPE)
    so, se = p.communicate()
    print (f"{so}\n{se}")
    return arg

########################
# Adapted from: https://stackoverflow.com/a/44869451 
#

proc_pool = None

def int_handler (*arg, **kwargs):
    if proc_pool:
        proc_pool.terminate()
        proc_pool.join()
        exit(1)

def initializer():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

proc_pool = Pool(32, initializer=initializer)

def main1():
    it = proc_pool.imap_unordered(waiter, range(0, 4))
    for r in it:
        print (r)

if __name__ == '__main__':
    main1()
ipetrik
  • 1,749
  • 18
  • 28