0

During multiprocessing process, error could occur in the function called by istarmap. The error of the function is catched by try, except, ... Each function process is completely independent. But it appears that workers are lost after function failure. Do you know how to keep workers to process the next operations after a function error ?

def CPU_Parallelization(Nb_CoresToBeUsed,
                        output_queue,
                        input_data,
                        output_result):

    try:
        mp.set_start_method("fork")
    except RuntimeError:
        pass

    with ignore_interrupt_signals():
        p = mp.Pool(processes=Nb_CoresToBeUsed)

    manager = mp.Manager()  # <---
    stop_event = manager.Event()

    try:
        for Out_1,\
            Out_2,\
            Out_3,\
            Out_4 in tqdm.tqdm(p.istarmap(function1, input_data), total=len(input_data), miniters=1):
            pass

        PoolImageGeneration.close()

    except KeyboardInterrupt: # <---
            stop_event.set() # <---
            sys.exit() # <---
            p.close()
            p.join()
    except Exception as e:
        pass

@contextlib.contextmanager
def ignore_interrupt_signals():
    previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
    yield
    signal.signal(signal.SIGINT, previous_handler)



def funct1(input_data):
    try:
        Out1 = 1
        Out2 = 2
        Out3 = 3
        Out4 = 4
    except Exception as e:
        print(e)
    return Out1, Out2, Out3, Out4



def istarmap(self, func, iterable, chunksize=1):
    """
    Starmap-version of imap
    Source : https://stackoverflow.com/questions/57354700/starmap-combined-with-tqdm
    """
    if self._state != mpp.RUN:
        raise ValueError("Pool not running")

    if chunksize < 1:
        raise ValueError("Chunksize must be 1+, not {0:n}".format(chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    #result = mpp.IMapIterator(self._cache)
    result = mpp.IMapIterator(self)
    self._taskqueue.put((self._guarded_task_generation(result._job, mpp.starmapstar, task_batches), result._set_length))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap
EdouardDKP
  • 71
  • 6
  • Does anyone has a solution to avoid the workers lost ? – EdouardDKP Sep 23 '22 at 07:18
  • Please edit your question and add the code for `function1`. And AFAICT, `multiprocessing.Pool` doesn't have an `istarmap` method. Is that a typo? – Roland Smith Sep 23 '22 at 08:36
  • istarmap is not a typo. I have added the code. Regarding function1, the code is really long so I have just added the main structure. – EdouardDKP Sep 23 '22 at 20:17
  • How do you know that workers are lost? – Roland Smith Sep 23 '22 at 20:25
  • I start with 23 cores for that task. In Ubuntu System Monitor, I see that all the 23 cores are loaded at 100%. If an error occur, the cores loaded at 100% are usually decreasing up to 1 core loaded at 100%. And my process take much much longer time. – EdouardDKP Sep 23 '22 at 20:36
  • I don't think there is enough information for a definitive answer here. So that's why I'm making this a comment and not an answer. But what I guess is happening is that something that your code does is disturbing the worker/parent communication in `multiprocessing`. I suspect that the thread that `multiprocessing` uses to do this communication hangs, leaving the workers without work. Maybe the size of the return tuple is too large, or it contains something that can't be pickled. Or maybe your `istarmap` has a bug; try with a normal `starmap` to see if that fixes the problem. – Roland Smith Sep 23 '22 at 21:50

0 Answers0