2

How to exit from a function called my multiprocessing.Pool

Here is an example of the code I am using, when I put a condition to exit from function worker when I use this as a script in terminal it halts and does not exit.

def worker(n):
    if n == 4:
        exit("wrong number")  # tried to use sys.exit(1) did not work
    return n*2

def caller(mylist, n=1):
    n_cores = n if n > 1 else multiprocessing.cpu_count()
    print(n_cores)
    pool = multiprocessing.Pool(processes=n_cores)
    result = pool.map(worker, mylist)
    pool.close()
    pool.join()
    return result

l = [2, 3, 60, 4]
myresult = caller(l, 4)
Medhat
  • 1,622
  • 16
  • 31
  • What exactly should that `exit` do? – David Ferenczy Rogožan Oct 09 '18 at 19:24
  • simply exit the script with error like the value is 4 ex: exit("Can not run the script value = 4") – Medhat Oct 09 '18 at 19:27
  • Just `return` from the function (or `return somevalue`) instead of calling `exit()`. You should also indent the last couple of lines inside an `if __name__ == '__main__':` – martineau Oct 09 '18 at 19:28
  • I tried `return 1` if there is code after this function it will process it normally , meanwhile I would like to stop execution and exit totally from the script. That is why I put a comment saying that I used `sys.exit(1)` – Medhat Oct 09 '18 at 19:30
  • 1
    You can't exit the main script from a worker process. – martineau Oct 09 '18 at 19:31
  • @martineau Thanks, simply that could be the answer to my question unless there is a way to exit the code (which was actually my question) – Medhat Oct 09 '18 at 19:34
  • I'm not aware of any way to do it—but maybe someone knows a trick... – martineau Oct 09 '18 at 19:37
  • I tried different ways to do it and I failed, that what pushed me to ask the question to share if it is even possible :) . – Medhat Oct 09 '18 at 19:38
  • If you [edit] your question and explain **why** you want to do such a thing, perhaps someone can suggest another technique that doesn't require doing this but still accomplishes the ultimate goal. – martineau Oct 09 '18 at 19:49
  • 1
    Did none of the answers work out for you? – Darkonaut Oct 26 '18 at 10:20
  • The second did not work, working on testing the first. – Medhat Oct 26 '18 at 22:20

2 Answers2

2

As I said, I don't think you can exit the process running the main script from a worker process.

You haven't explained exactly why you want to do this, so this answer is a guess, but perhaps raising a custom Exception and handling it in an explict except as shown below would be an acceptable way to workaround the limitation.

import multiprocessing
import sys

class WorkerStopException(Exception):
    pass

def worker(n):
    if n == 4:
        raise WorkerStopException()
    return n*2

def caller(mylist, n=1):
    n_cores = n if n > 1 else multiprocessing.cpu_count()
    print(n_cores)
    pool = multiprocessing.Pool(processes=n_cores)
    try:
        result = pool.map(worker, mylist)
    except WorkerStopException:
        sys.exit("wrong number")
    pool.close()
    pool.join()
    return result

if __name__ == '__main__':
    l = [2, 3, 60, 4]
    myresult = caller(l, 4)

Output displayed when run:

4
wrong number

(The 4 is the number of CPUs my system has.)

martineau
  • 119,623
  • 25
  • 170
  • 301
  • The main idea is: I am processing some file and initiate an object contains info about this file: so if the file lack some info I need to exit execution and inform user that the file lacks this info and we exit the operation correct the file to continue. user can enter multiple files and that is why I am using Pool so they will be processed in parallel. Hope that helped. – Medhat Oct 09 '18 at 20:30
  • In that case it seems like the approach in my answer would be useful (if I'm understanding what you're saying correctly). – martineau Oct 09 '18 at 20:37
  • Thanks, I tried it but ended up with: `TypeError: catching classes that do not inherit from BaseException is not allowed` I am using python 3.5.4 – Medhat Oct 09 '18 at 20:56
  • I ran it with Python 3.7.0 (on Windows) so perhaps that restriction has been removed. – martineau Oct 09 '18 at 21:40
  • FWIW that error message sounds like you didn't use `Exception` as the base class of subclass `WorkerStopException` because `Exception` itself **is** a subclass of `BaseException`, so the custom subclass is also one (indirectly). – martineau Oct 09 '18 at 21:56
  • Please answer my question about whether the exception class your test code raises is a subclass of the the built-in `Exception` class or not. – martineau Apr 07 '21 at 13:30
1

The thing with pool.map is, that it will raise exceptions from child-processes only after all tasks are finished. But your comments sound like you need immediate abortion of all processing as soon as a wrong value is detected in any process. This would be a job for pool.apply_async then.

pool.apply_async offers error_callbacks, which you can use to let the pool terminate. Workers will be fed item-wise instead of chunk-wise like with the pool.map variants, so you get the chance for early exit on each processed argument.

I'm basically reusing my answer from here:

from time import sleep
from multiprocessing import Pool

def f(x):
    sleep(x)
    print(f"f({x})")
    if x == 4:
        raise ValueError(f'wrong number: {x}')
    return x * 2

def on_error(e):
    if type(e) is ValueError:
        global terminated
        terminated = True
        pool.terminate()
        print(f"oops: {type(e).__name__}('{e}')")


def main():
    global pool
    global terminated

    terminated = False

    pool = Pool(4)
    results = [pool.apply_async(f, (x,), error_callback=on_error)
               for x in range(10)]
    pool.close()
    pool.join()

    if not terminated:
        for r in results:
            print(r.get())


if __name__ == '__main__':
    main()

Output:

f(0)
f(1)
f(2)
f(3)
f(4)
oops: ValueError('wrong number: 4')

Process finished with exit code 0
Darkonaut
  • 20,186
  • 7
  • 54
  • 65