9

System information

  • python 3.8.7
  • OS 11.1 (Big Sur)
  • Python installed via brew install python@3.8

To reproduce on Big Sur and most probably older versions:

import multiprocessing as mp


if __name__ == '__main__':
    exp_queue = mp.Queue()
    print(exp_queue.qsize())

Results in:

  File "/Users/username/Library/Application Support/JetBrains/PyCharm2020.3/scratches/scratch.py", line 5, in <module>
    print(exp_queue.qsize())
  File "/usr/local/Cellar/python@3.8/3.8.7/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 120, in qsize
    return self._maxsize - self._sem._semlock._get_value()
NotImplementedError

It looks like whoever wrote this in multiprocessing/queues.py line 120 is aware of the issue, but I can't find a solution somewhere:

def qsize(self):
    # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
    return self._maxsize - self._sem._semlock._get_value()
watch-this
  • 1
  • 4
  • 20

2 Answers2

3

As Víctor Terrón has suggested in a GitHub discussion, you can use his implementation:

https://github.com/vterron/lemon/blob/d60576bec2ad5d1d5043bcb3111dff1fcb58a8d6/methods.py#L536-L573

According to the doc:

A portable implementation of multiprocessing.Queue. Because of multithreading / multiprocessing semantics, Queue.qsize() may raise the NotImplementedError exception on Unix platforms like Mac OS X where sem_getvalue() is not implemented. This subclass addresses this problem by using a synchronized shared counter (initialized to zero) and increasing / decreasing its value every time the put() and get() methods are called, respectively. This not only prevents NotImplementedError from being raised, but also allows us to implement a reliable version of both qsize() and empty().

Arman
  • 126
  • 3
  • 14
  • 5
    To anyone finding this, be aware that Víctor Terrón's Queue implementation is licensed under the GPL, meaning that you can't use it in your code without also making your code GPL licensed. – Ben Burns Aug 14 '22 at 09:05
2

Simplest solution: use multiprocessing.Manager. It provides a Queue class with qsize() implemented. You don't have to copypasta GPL-licensed code. Quick example:

import multiprocessing
import time

def worker(x, que):
    que.put(x**2)

if __name__ == '__main__':
    inputs = list(range(1000))

    pool = multiprocessing.Pool(processes=5)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = [pool.apply_async(worker, (i, q)) for i in inputs]
    while q.qsize() < len(inputs):
        time.sleep(1)

    results = [q.get() for _ in range(q.qsize())]
    assert len(results) == len(inputs)

I'm running Ventura 13.0.1 (macOS) and it works fine.

lkwbr
  • 131
  • 1
  • 5