I wrapped Python's (3.7 on Ubuntu) multiprocessing.Queue
with a class for easy usage of running a function on a list of items ("tasks"), using both an input and output Queue. But when I read the results from the output queue, I sometimes (approx 1 in 10 or 15 calls in the example below) get an Empty
exception, although the queue still has some items. I found this issue from 2014 where it is suggested to use a blocking get
with small timeout, but this doesn't help. I have used this code in the past successfully, but not wrapped in a class, so my guess is that it has something to do with the way I wrap it.
This is my class:
import multiprocessing
import os
class ProcessParallel():
def __init__(self):
self.qtasks = multiprocessing.JoinableQueue()
def process_queue(self, target_func, q_out):
pid = os.getpid()
#print(f'[pid {pid}] started processing queue')
while True:
task = self.qtasks.get()
if task is None:
#print(f'[pid {pid}] got stop signal from queue')
self.qtasks.task_done()
break
res = target_func(task)
q_out.put(res)
self.qtasks.task_done()
def process(self, num_jobs, target_func, task_list, q_out, verbose = True):
plist = []
for k in range(num_jobs):
plist.append(multiprocessing.Process(target = self.process_queue, args = (target_func, q_out)))
for p in plist:
p.start()
#--- populate the tasks queue, inc. a stop signal for each process
for task in task_list:
self.qtasks.put(task)
for _ in range(num_jobs):
self.qtasks.put(None)
if verbose:
print('waiting for the tasks queue to join')
self.qtasks.join()
if verbose:
print('tasks queue joined')
print(f'terminating {len(plist)} process')
for p in plist:
p.terminate()
if verbose:
print('done')
And this is my test code:
def myfun(x):
return 3 * x + 1
q_out = multiprocessing.Queue()
ppar = ProcessParallel()
ppar.process(4, myfun, [1,2,3,4], q_out, True)
print(f'{q_out.qsize()} results in output queue')
for _ in range(q_out.qsize()):
#r = q_out.get_nowait() # non-blocking call also raises Empty exception occasionally
r = q_out.get(timeout = 0.01)
print(f'got item from queue: {r}')