During multiprocessing process, error could occur in the function called by istarmap. The error of the function is catched by try, except, ... Each function process is completely independent. But it appears that workers are lost after function failure. Do you know how to keep workers to process the next operations after a function error ?
def CPU_Parallelization(Nb_CoresToBeUsed,
output_queue,
input_data,
output_result):
try:
mp.set_start_method("fork")
except RuntimeError:
pass
with ignore_interrupt_signals():
p = mp.Pool(processes=Nb_CoresToBeUsed)
manager = mp.Manager() # <---
stop_event = manager.Event()
try:
for Out_1,\
Out_2,\
Out_3,\
Out_4 in tqdm.tqdm(p.istarmap(function1, input_data), total=len(input_data), miniters=1):
pass
PoolImageGeneration.close()
except KeyboardInterrupt: # <---
stop_event.set() # <---
sys.exit() # <---
p.close()
p.join()
except Exception as e:
pass
@contextlib.contextmanager
def ignore_interrupt_signals():
previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
yield
signal.signal(signal.SIGINT, previous_handler)
def funct1(input_data):
try:
Out1 = 1
Out2 = 2
Out3 = 3
Out4 = 4
except Exception as e:
print(e)
return Out1, Out2, Out3, Out4
def istarmap(self, func, iterable, chunksize=1):
"""
Starmap-version of imap
Source : https://stackoverflow.com/questions/57354700/starmap-combined-with-tqdm
"""
if self._state != mpp.RUN:
raise ValueError("Pool not running")
if chunksize < 1:
raise ValueError("Chunksize must be 1+, not {0:n}".format(chunksize))
task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
#result = mpp.IMapIterator(self._cache)
result = mpp.IMapIterator(self)
self._taskqueue.put((self._guarded_task_generation(result._job, mpp.starmapstar, task_batches), result._set_length))
return (item for chunk in result for item in chunk)
mpp.Pool.istarmap = istarmap