1

In the code below, I have raised an exception during the first call, and yet it seems the exception is absorbed, and I still got all other processes executed, what's the problem? What I want is that whenever the first exception occurs, print it, and stop the multiprocessing pool directly.

def func(i):

    if i==0:
        raise Exception()
    else:
        time.sleep(1)
        print(i)

num_workers = 4

pool = multiprocessing.Pool(num_workers)

try:
    for i in range(4):
        pool.apply_async(func,args=(i,))
except:
    print("err")

pool.close()
pool.join()

The following edited code according to HTF

import multiprocessing
import time

if __name__ == '__main__':
    def func(i):

        if i == 0:
            raise Exception()
        else:
            time.sleep(1)
            print(i)


    num_workers = 4

    pool = multiprocessing.Pool(num_workers)

    results = [pool.apply_async(func, args=(i,)) for i in range(4)]

    try:
        for result in results:
            result.get()
    except:
        print("err")

    pool.close()
    pool.join()

gives output

err
1
2
3

where I expect only err

william007
  • 17,375
  • 25
  • 118
  • 194
  • From the looks of this you are wanting to do something like this https://stackoverflow.com/questions/36962462/terminate-a-python-multiprocessing-program-once-a-one-of-its-workers-meets-a-cer. – Axe319 Apr 07 '21 at 15:58

1 Answers1

0

You just scheduled the tasks but you need to wait for the results:

results = [pool.apply_async(func,args=(i,)) for i in range(4)]

try:
    for result in results:
        result.get()
except:
    print("err")

Update Wed 7 Apr 20:42:59 UTC 2021:

You can try something like this:

import time
  
from functools import partial
from multiprocessing import Pool


def func(i):
    if i == 0:
        raise Exception("something bad happened")
    else:
        time.sleep(1)
        print(i)


def quit(pool, err):
    print(f"ERROR: {err}")
    pool.terminate()


def main():
    pool = Pool()
    partial_quit = partial(quit, pool)

    for i in range(4):
        pool.apply_async(func, args=(i,), error_callback=partial_quit)

    pool.close()
    pool.join()


if __name__ == "__main__":
    main()

Test:

$ python test1.py
ERROR: something bad happened

If you need the return value back it may be actually easier to use bare processes and a queue:

import time
  
from multiprocessing import Process, Queue

PROCS = 4


def worker(q, i):
    if i == 10:
        print("error")
        q.put_nowait("ERROR")
    else:
        time.sleep(1)
        print(i)
        q.put_nowait(i)


def main():
    q = Queue()
    procs = []

    for i in range(PROCS):
        p = Process(target=worker, args=(q, i))
        p.start()
        procs.append(p)

    count = len(procs)

    while count:
        result = q.get()

        if result == "ERROR":
            for p in procs:
                p.terminate()
            break

        print(f"Result for: {result}")
        count -= 1


if __name__ == "__main__":
    main()

Test:

$ python test2.py
0
2
1
3
Result for: 0
Result for: 2
Result for: 1
Result for: 3
HTF
  • 6,632
  • 6
  • 30
  • 49