0

I wrapped Python's (3.7 on Ubuntu) multiprocessing.Queue with a class for easy usage of running a function on a list of items ("tasks"), using both an input and output Queue. But when I read the results from the output queue, I sometimes (approx 1 in 10 or 15 calls in the example below) get an Empty exception, although the queue still has some items. I found this issue from 2014 where it is suggested to use a blocking get with small timeout, but this doesn't help. I have used this code in the past successfully, but not wrapped in a class, so my guess is that it has something to do with the way I wrap it.

This is my class:

import multiprocessing
import os

class ProcessParallel():
    def __init__(self):
        self.qtasks = multiprocessing.JoinableQueue()
    
    def process_queue(self, target_func, q_out):
        pid = os.getpid()
        #print(f'[pid {pid}] started processing queue')
        while True:
            task = self.qtasks.get()
            if task is None:
                #print(f'[pid {pid}] got stop signal from queue')
                self.qtasks.task_done()
                break
            res = target_func(task)
            q_out.put(res)
            self.qtasks.task_done()
    
    def process(self, num_jobs, target_func, task_list, q_out, verbose = True):        
        plist = []
        for k in range(num_jobs):
            plist.append(multiprocessing.Process(target = self.process_queue, args = (target_func, q_out)))

        for p in plist:
            p.start()

        #--- populate the tasks queue, inc. a stop signal for each process
        for task in task_list:
            self.qtasks.put(task)
        
        for _ in range(num_jobs):
            self.qtasks.put(None)
        
        if verbose:
            print('waiting for the tasks queue to join')
        self.qtasks.join()
        if verbose:
            print('tasks queue joined')
            print(f'terminating {len(plist)} process')
            
        for p in plist:
            p.terminate()
        if verbose:
            print('done')

And this is my test code:

def myfun(x):
    return 3 * x + 1

q_out = multiprocessing.Queue()
ppar = ProcessParallel()
ppar.process(4, myfun, [1,2,3,4], q_out, True)

print(f'{q_out.qsize()} results in output queue')
for _ in range(q_out.qsize()):
    #r = q_out.get_nowait() # non-blocking call also raises Empty exception occasionally 
    r = q_out.get(timeout = 0.01)
    print(f'got item from queue: {r}')

The output when exception is raised look like: enter image description here

Itamar Katz
  • 9,544
  • 5
  • 42
  • 74
  • 1
    The exception is not reproducible with my environment (Win10, Py3.10, program launched from the command line). – Paul Cornelius Feb 28 '23 at 00:28
  • Updating to Python 3.10 the problem does not appear in my test code above, but when running it using my actual function and data, it does occur. – Itamar Katz Mar 01 '23 at 08:15
  • I can reproduce your issue with Python 3.11, Linux (fork & spawn). This happens when you end up with a corrupted (deadlocked) `multiprocessing.Queue` due to premature child-process termination. Deleting the `Process.terminate()`-loop in your code should resolve it. – Darkonaut Mar 01 '23 at 18:38
  • thanks, it works - but also see me answer below, which is apparently also a problem (putting too large objects in the output queue). Can you please explain the reason in more details? I am calling `terminate` only after the input queue (`qtasks` in my code) joins, meaning no `put` operation is done on the output queue after this point – Itamar Katz Mar 02 '23 at 08:56
  • Also, if you turn your comment into an answer, I'll accept it – Itamar Katz Mar 02 '23 at 08:56
  • I see @CharchitAgarwal already filled you in on some details. The return from `multiprocessing.Queue.put()` doesn't mean anything has been actually send, this happens eventually via a separate feeder-thread. So you cannot depend on `qtasks.join()` as meaning "results received" at this point. There's no separate problem with object sizes. Never use `.qsize()` for control flow, use something reliably known like the number of tasks instead. It's not clear to me why you're using a `timeout` in the first place, since you're also not handling the `Empty` exception. – Darkonaut Mar 02 '23 at 13:55
  • thanks @Darkonaut for the info. For my actual final code I am not using the output queue, instead each process writes the answer to a database. I find it more reliable when item size is not small (like a single int). Regarding the timeout, as I mentioned this is just a test code, I don't mind the exception breaking the flow. I tried with/without timeout to see the difference. – Itamar Katz Mar 07 '23 at 07:42

1 Answers1

0

After updating to Python 3.10, the problem disappeared for the test code in my question, but still remained for my real use case where the size of both input and output data and the number of jobs is bigger. Turns out the output Queue can't handle items above some size. For integers (as in the example) there is no problem, but in my case the output is a (small) pandas Dataframe. Actually I was burnt in the past whenever I tried to write the output to a Queue, and always backed to writing it to disk/DB table etc. But I thought that this time it would work since each item returned is quite small.

In case anyone wants to reproduce the problem, just replace the function as follows:

import numpy as np
def myfun(x):
    return np.arange(100)

and run it with size 100 input and (say) 20 jobs:

njobs = 20
ppar.process(njobs, myfun, range(100), q_out, verbose = False)

There is clear correlation between the size of the output, and the number of items retrieved before the Empty exception kicks in. So it is most likely a memory issue/limit in the Queue class - I just wish it was documented.

Itamar Katz
  • 9,544
  • 5
  • 42
  • 74
  • 1
    Queues are simply a wrapper around pipes, which have a fixed size that varies from machine to machine. When you put items on a queue, they are not immediately put on the internal pipe (it may already be full) but on a process-exclusive buffer instead. When there is space available on the pipe again (when a process does `queue.get()`), that's when the enqueued items are actually put on the pipe. So there could be a small delay before queue.get() returns the item (and not the `Empty` exception). This is also documented: https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues – Charchit Agarwal Mar 02 '23 at 09:02
  • 1
    Some more context which may be useful here: https://stackoverflow.com/a/75565500/16310741 – Charchit Agarwal Mar 02 '23 at 09:03