3

I'm using the starmap_async function from Python's multiprocessing library. However, I noticed that if my code encounters an error in one of the processes, the exception is not thrown until all processes have finished. Here's the relevant code:

from multiprocessing import Pool, cpu_count
import datetime
import itertools
import time

       with Pool(max(cpu_count()//2, 1)) as p:
            #p = Pool()
            df_iter = df_options.iterrows()
            ir = itertools.repeat
            results = p.starmap_async(_run,zip(df_iter,ir(fixed_options),ir(outputs_grab)), chunksize=1)
            p.close() #no more jobs to submit
            
            #Printing progress
            
            n_remaining = results._number_left + 1
            while (not results.ready()):
                time.sleep(1)
                #Check for errors here ... How ????
                #Then what? call terminate()????? 
                if verbose:
                   if results._number_left < n_remaining:
                      now = datetime.datetime.now()
                      n_remaining = results._number_left
                      print('%d/%d  %s' % (n_remaining,n_rows,str(now)[11:]))
                    
            print('joining')
            p.join()
            
            all_results = results.get()
            
        df = pd.DataFrame(all_results)

Currently if I raise an error in the spawned processes it appears that other processes not only finish running, but start new tasks despite there being an error from one of the calls.

Some searching is leading me to believe that this may not be possible. One person seemed to suggest I might need to use concurrent.futures instead, although it is unclear how to map my example to that example, especially with keeping the real time feedback as processes finish.

discussion of concurrent.futures: https://stackoverflow.com/a/47108581/764365

Jimbo
  • 2,886
  • 2
  • 29
  • 45

1 Answers1

2

tldr; Use imap_unordered to have the smallest latency to the main process knowing about a child throwing an exception as it allows you to process results in the main process as soon as they come in via the results Queue. You can then use a wrapper function to build your own "star" version of the function if you desire. As a point of code design, most Pool methods tend to re-raise exceptions from the child whereas concurrent.futures tends to set an attribute of the return value to indicate the exception that was raised.

from random import random
from functools import partial
from multiprocessing import Pool
from time import sleep

def foo(a, b):
    sleep(random()) #introduce some processing delay to simulate work
    if random() > .95:
        raise Exception("randomly rasied an exception")
    else:
        return f"{a}\t{b}"

def star_helper(func, args):
    return func(*args)

if __name__ == "__main__":
    n = 20
    print("chance of early termination:", (1-.95**n)*100, "%")
    with Pool() as p:
        try:
            for result in p.imap_unordered(partial(star_helper, foo), zip(range(n), range(n))):
                print(result)
        except:
            p.terminate()
            print("terminated")
    print("done") # `with Pool()` joins the child processes to prove they quit early
Aaron
  • 10,133
  • 1
  • 24
  • 40
  • Just to add on, if you got this route then the results will not be ordered. If you want to preserve the order use `pool.apply_async` to send them one by one (without blocking) instead, so you can keep track of the results as they come just like `pool.imap_unordered` does. – Charchit Agarwal Jul 25 '22 at 22:35
  • I specifically used `imap_unordered` to minimize latency (main process does not wait for correct ordered output to generate next loop iteration), but the regular `imap` would also work to keep things in order, and have a given minimum latency based on the `chunksize` and #of workers. – Aaron Jul 26 '22 at 01:18
  • 1
    Works great! I made two small modifications. I am raising the last error after termination which I had to look up and is just `raise`. I also added an enumeration so I could get a counter for printing progress as it progresses. Thanks! – Jimbo Jul 29 '22 at 21:08