I'm using the starmap_async
function from Python's multiprocessing library. However, I noticed that if my code encounters an error in one of the processes, the exception is not thrown until all processes have finished. Here's the relevant code:
from multiprocessing import Pool, cpu_count
import datetime
import itertools
import time
with Pool(max(cpu_count()//2, 1)) as p:
#p = Pool()
df_iter = df_options.iterrows()
ir = itertools.repeat
results = p.starmap_async(_run,zip(df_iter,ir(fixed_options),ir(outputs_grab)), chunksize=1)
p.close() #no more jobs to submit
#Printing progress
n_remaining = results._number_left + 1
while (not results.ready()):
time.sleep(1)
#Check for errors here ... How ????
#Then what? call terminate()?????
if verbose:
if results._number_left < n_remaining:
now = datetime.datetime.now()
n_remaining = results._number_left
print('%d/%d %s' % (n_remaining,n_rows,str(now)[11:]))
print('joining')
p.join()
all_results = results.get()
df = pd.DataFrame(all_results)
Currently if I raise an error in the spawned processes it appears that other processes not only finish running, but start new tasks despite there being an error from one of the calls.
Some searching is leading me to believe that this may not be possible. One person seemed to suggest I might need to use concurrent.futures
instead, although it is unclear how to map my example to that example, especially with keeping the real time feedback as processes finish.
discussion of concurrent.futures
: https://stackoverflow.com/a/47108581/764365