1

I want to count how many elements have ever been putted into multiprocessing.Queue. my implementation is by subclassing multiprocessing.Queue:

import multiprocessing
from multiprocessing.queues import Queue


class QueueFPS(Queue):
    def __init__(self, maxsize=200):
        self.frame_count = 0
        ctx = multiprocessing.get_context()
        super().__init__(maxsize, ctx=ctx)

    def put(self,*args, **kwargs):
        self.frame_count += 1
        print("count in put function: ", self.frame_count)
        super().put(*args, **kwargs)

    def get_count(self):
        print("count in get_count: ", self.frame_count)

But when I use this class, I found that the get_count() method always return 0 if I run it with multi process:

def worker(test_queue):
    for i in range(2):
        test_queue.put("A")

def test_multi_process():
    test_queue = QueueFPS()
    test_process = multiprocessing.Process(
                    target=worker,
                    args=(test_queue,))
    test_process.start()
    test_process.join()
    test_queue.close()
    test_queue.join_thread()
    print(test_queue.get_count())

the out put is:

count in put function:  1
count in put function:  2
count in get_count:  0

if I run it with only one process, it works as expected, but it will raise a exception(I omit some output of exception)

def test_single_process():
    test_queue = QueueFPS()
    for i in range(2):
        test_queue.put("A")
    print(test_queue.get_count())

the output is:

count in put function:  1
count in put function:  2
count in get_count:  2
None
>>> Traceback (most recent call last):
   (I omit some output here)
BrokenPipeError: [Errno 32] Broken pipe
scott huang
  • 2,478
  • 4
  • 21
  • 36

2 Answers2

1

frame_count is not shared across processes and hence you don't get desired output while using multiple processes. If you try to print

self.qsize()

which is actually an attribute of the Queue class object you will see the it prints 2.

The reason for this is how Queue is implemented to be thread-safe and process-safe. Here is what Python doc says:

class multiprocessing.Queue([maxsize])

Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.

Since state of the frame_count attribute is not synchronised across processes you cant get the desired output.

Community
  • 1
  • 1
Sadanand Upase
  • 132
  • 1
  • 15
  • I am aware of that multiprocessing.Queue Handle the synchronization between processes. your point is by subclassing it, myown attribute can't benefit from it, I need implement my own synchronization ? – scott huang Oct 24 '17 at 09:23
  • Yes. Because as per the implementation here [https://github.com/python/cpython/blob/master/Lib/queue.py]. You need to use threading.Lock and threading.Condition to make sure that your changes are synchronised. – Sadanand Upase Oct 24 '17 at 09:37
0

You call the put method in a child process. Thus your put only changes the Queue object that is in that process. The changes do not propagate to the other processes automagically, you need to take special measures. (Queue itself uses a Pipe and custom logic to pass pickled items between processes and that's the only thing that connects its copies.)

ivan_pozdeev
  • 33,874
  • 19
  • 107
  • 152