1

I've taken over some code a former colleague wrote, which was frequently getting stuck when one or more parallelised functions through a NameError exception, which wasn't caught. (The parallelisation is handled by multiprocessing.Pool.) Because the exception is due to certain arguments not being defined, the only way I've been able to catch this exception is to put the pool.apply_async commands into try...except blocks, like so:

from multiprocessing import Pool
# Define worker functions
def workerfn1(args1):
  #commands
def workerfn2(args2):
  #more commands
def workerfn3(args3):
  #even more commands

# Execute worker functions in parallel
with Pool(processes=os.cpu_count()-1) as pool:
  try:
    r1 = pool.apply_async(workerfn1, args1)
  except NameError as e:
    print("Worker function r1 failed")
    print(e)
  try:
    r2 = pool.apply_async(workerfn2, args2)
  except NameError as e:
    print("Worker function r2 failed")
    print(e)
  try:
    r3 = pool.apply_async(workerfn3, args3)
  except NameError as e:
    print("Worker function r3 failed")
    print(e)

Obviously, the try...except blocks are not parallelised, but the interpreter has to read the apply_async commands sequentially anyway while it assigns them to different CPUs...so will these three functions still be executed in parallel (if they don't throw the NameError exception), or does the use of try...except prevent this from happening?

CrowsNose
  • 83
  • 1
  • 10

1 Answers1

1

First, you need to be more careful in posting code that is not full of spelling and other errors.

Method multiprocessing.pool.Pool.apply_async (not apply_sync) returns a multiprocessing.pool.AsyncResult instance. It is only when you call method get on this instance that you get either the return value from your worker function or any exception that occurred in your worker function is now thrown. So:

from multiprocessing import Pool


# Define worker functions
def workerfn1(args1):
    ...

def workerfn2(args2):
    ...

def workerfn3(args3):
    raise NameError('Some name goes here.')

# Required for Windows:
if __name__ == '__main__':
    # Execute worker functions in parallel
    with Pool(processes=3) as pool:
        result1 = pool.apply_async(workerfn1, args=(1,))
        result2 = pool.apply_async(workerfn2, args=(1,))
        result3 = pool.apply_async(workerfn3, args=(1,))
        try:
            return_value1 = result1.get()
        except NameError as e:
            print("Worker function workerfn1 failed:", e)
        try:
            return_value2 = result2.get()
        except NameError as e:
            print("Worker function workerfn2 failed:", e)
        try:
            return_value3 = result3.get()
        except NameError as e:
            print("Worker function workerfn3 failed:", e)

Prints:

Worker function workerfn3 failed: Some name goes here.

Note

Without calling get on the AsyncResult returned from apply_async you are not waiting for the completion of the submitted task and there is no point in surrounding the call with try/catch. When you then fall through the with block an implicit call to terminate will be done on the pool instance that will immediately kill all running pool processes and any running tasks will be halted and any tasks waiting to run will be purged. You can call pool.close() followed by pool.join() within the block and that sequence will wait for all submitted tasks to complete. But without explicitly calling get on the AsyncResult instances you will not be able to get return values or exceptions.

Booboo
  • 38,656
  • 3
  • 37
  • 60