I was trying to use multiprocessing.Queue
to save the return value of multiprocessing.Process
:
queue = Queue()
for i, (name, func) in enumerate(funcs.items()):
p = Process(target=analyse, args=(i, name, func, grid, queue))
The problem is that Queue.qsize
doesn't work on macOS, so I use the implementation in this answer.
class Queue(multiprocessing.queues.Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
self.size = SharedCounter(0)
def put(self, *args, **kwargs):
self.size.increment(1)
super().put(*args, **kwargs)
Note that ctx=multiprocessing.get_context()
is added to fix the missing ctx
, according to this answer.
The code in question:
def analyse(i, name, func, grid, queue):
...
queue.put((i, name, single, minimum, current, peak))
And Python's complaint:
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "...", line 26, in analyse
queue.put((i, name, single, minimum, current, peak))
File "...", line 48, in put
self.size.increment(1)
AttributeError: 'Queue' object has no attribute 'size'
Any ideas where the error is? Tried to debug in PyCharm, queue
still has size
when it's passed to multiprocessing.Process
but it's no longer there when queue.put()
is called in analyse()
.
Edit: Feel free to answer this question. However, I gave up on Queue
and instead used multiprocessing.Manager
, sacrificing some precious milliseconds there.