1

My reproduction is wrong, as noted in Rugnar's answer. I'm leaving the code mostly as-is as I'm not sure where this falls between clarifying and changing the meaning.

I have some thousands of jobs that I need to run and would like any errors to halt execution immediately. I wrap the task in a try / exceptraise so that I can log the error (without all the multiprocessing/threading noise), then reraise. This does not kill the main process.

What's going on, and how can I get the early exit I'm looking for? sys.exit(1) in the child deadlocks, wrapping the try / exceptraise function in yet another function doesn't work either.

$ python3 mp_reraise.py
(0,)
(1,)
(2,)
(3,)
(4,)
(5,)
(6,)
(7,)
(8,)
(9,)
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "mp_reraise.py", line 5, in f_reraise
    raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mp_reraise.py", line 14, in <module>
    test_reraise()
  File "mp_reraise.py", line 12, in test_reraise
    p.map(f_reraise, range(10))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
Exception: (0,)

mp_reraise.py

import multiprocessing

def f_reraise(*args):
    try:
        raise Exception(args)
    except Exception as e:
        print(e)
        raise

def test_reraise():
    with multiprocessing.Pool() as p:
        p.map(f_reraise, range(10))

test_reraise()

If I don't catch and reraise, execution stops early as expected: [this actually does not stop, as per Rugnar's answer]

$ python3 mp_raise.py 
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "mp_raise.py", line 4, in f_raise
    raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mp_raise.py", line 10, in <module>
    test_raise()
  File "mp_raise.py", line 8, in test_raise
    p.map(f_raise, range(10))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
Exception: (0,)  

mp_raise.py

import multiprocessing

def f_raise(*args):
    # missing print, which would demonstrate that
    # this actually does not stop early
    raise Exception(args)

def test_raise():
    with multiprocessing.Pool() as p:
        p.map(f_raise, range(10))

test_raise()
Andrea Reina
  • 1,170
  • 8
  • 19

1 Answers1

1

In your mp_raise.py you dont print anything so you dont see how much jobs were done. I added print and found out that pool sees an exeption of the child only when jobs iterator is exhausted. So it never stop early.

If you need stop early after exception, try this

import time
import multiprocessing as mp


def f_reraise(i):
    if abort.is_set():  # cancel job if abort happened
        return
    time.sleep(i / 1000)  # add sleep so jobs are not instant, like in real life
    if abort.is_set():  # probably we need stop job in the middle of execution if abort happened
        return
    print(i)
    try:
        raise Exception(i)
    except Exception as e:
        abort.set()
        print('error:', e)
        raise


def init(a):
    global abort
    abort = a


def test_reraise():
    _abort = mp.Event()

    # jobs should stop being fed to the pool when abort happened
    # so we wrap jobs iterator this way
    def pool_args():
        for i in range(100):
            if not _abort.is_set():
                yield i

    # initializer and init is a way to share event between processes
    # thanks to https://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes
    with mp.Pool(8, initializer=init, initargs=(_abort,)) as p:
        p.map(f_reraise, pool_args())


if __name__ == '__main__':
    test_reraise()
Rugnar
  • 2,894
  • 3
  • 25
  • 29
  • I swear uncaught exceptions in the actual application code were causing the main process to quit early, but this solves my Y problem so the X doesn't really matter I suppose. I'll wait the customary 24h to accept; feel free to ping me if I forget (a not-unlikely occurrence) :) – Andrea Reina Jan 31 '20 at 02:45