2

I was trying to use multiprocessing.Queue to save the return value of multiprocessing.Process:

queue = Queue()
for i, (name, func) in enumerate(funcs.items()):
    p = Process(target=analyse, args=(i, name, func, grid, queue))

The problem is that Queue.qsize doesn't work on macOS, so I use the implementation in this answer.

class Queue(multiprocessing.queues.Queue):
    """ A portable implementation of multiprocessing.Queue.
    Because of multithreading / multiprocessing semantics, Queue.qsize() may
    raise the NotImplementedError exception on Unix platforms like Mac OS X
    where sem_getvalue() is not implemented. This subclass addresses this
    problem by using a synchronized shared counter (initialized to zero) and
    increasing / decreasing its value every time the put() and get() methods
    are called, respectively. This not only prevents NotImplementedError from
    being raised, but also allows us to implement a reliable version of both
    qsize() and empty().
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
        self.size = SharedCounter(0)

    def put(self, *args, **kwargs):
        self.size.increment(1)
        super().put(*args, **kwargs)

Note that ctx=multiprocessing.get_context() is added to fix the missing ctx, according to this answer.

The code in question:

def analyse(i, name, func, grid, queue):
    ...
    queue.put((i, name, single, minimum, current, peak))

And Python's complaint:

Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "...", line 26, in analyse
    queue.put((i, name, single, minimum, current, peak))
  File "...", line 48, in put
    self.size.increment(1)
AttributeError: 'Queue' object has no attribute 'size'

Any ideas where the error is? Tried to debug in PyCharm, queue still has size when it's passed to multiprocessing.Process but it's no longer there when queue.put() is called in analyse().

Edit: Feel free to answer this question. However, I gave up on Queue and instead used multiprocessing.Manager, sacrificing some precious milliseconds there.

  • I've tried a simplified version (on Linux) and couldn't replicate your problem? – Timus Nov 09 '21 at 14:35
  • Okay, now I see it too, on macOS as well as Windows. – Timus Nov 09 '21 at 17:52
  • I've been having the same exact issue and gave up on it twice now. I'm trying to make my code work on Linux/MacOS but can't make the mac version work. – jriskin Jan 28 '22 at 01:04

1 Answers1

1

Ok, here is the complete working solution. Apparently the queue needs its state set and restored as mentioned here.

import multiprocessing
import multiprocessing.queues as mpq

class Queue(mpq.Queue):
    """ A portable implementation of multiprocessing.Queue.
    Because of multithreading / multiprocessing semantics, Queue.qsize() may
    raise the NotImplementedError exception on Unix platforms like Mac OS X
    where sem_getvalue() is not implemented. This subclass addresses this
    problem by using a synchronized shared counter (initialized to zero) and
    increasing / decreasing its value every time the put() and get() methods
    are called, respectively. This not only prevents NotImplementedError from
    being raised, but also allows us to implement a reliable version of both
    qsize() and empty().
    """
    def __init__(self, maxsize=-1, block=True, timeout=None):
        self.block = block
        self.timeout = timeout
        super().__init__(maxsize, ctx=multiprocessing.get_context())
        self.size = SharedCounter(0)

    def __getstate__(self):
        return super().__getstate__() + (self.size,)

    def __setstate__(self, state):
        super().__setstate__(state[:-1])
        self.size = state[-1]

    def put(self, *args, **kwargs):
        super(Queue, self).put(*args, **kwargs)
        self.size.increment(1)
    
    def get(self, *args, **kwargs):
        item = super(Queue, self).get(*args, **kwargs)
        self.size.increment(-1)
        return item

    def qsize(self):
        """ Reliable implementation of multiprocessing.Queue.qsize() """
        return self.size.value

    def empty(self):
        """ Reliable implementation of multiprocessing.Queue.empty() """
        return not self.qsize()

    def clear(self):
        """ Remove all elements from the Queue. """
        while not self.empty():
            self.get()

class SharedCounter(object):
    """ A synchronized shared counter.
    The locking done by multiprocessing.Value ensures that only a single
    process or thread may read or write the in-memory ctypes object. However,
    in order to do n += 1, Python performs a read followed by a write, so a
    second process may read the old value before the new one is written by the
    first process. The solution is to use a multiprocessing.Lock to guarantee
    the atomicity of the modifications to Value.
    This class comes almost entirely from Eli Bendersky's blog:
    http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
    """

    def __init__(self, n = 0):
        self.count = multiprocessing.Value('i', n)

    def increment(self, n = 1):
        """ Increment the counter by n (default = 1) """
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        """ Return the value of the counter """
        return self.count.value
jriskin
  • 491
  • 2
  • 14