4

I have two multiprocessing threads, one adds items to a queue, the other needs to iterate through the current queue. How do I do that iteration? Or alternatively, how do I convert the current queue to a list to iterate?

Some pseudocode:

import multiprocessing as mp
thequeue = mp.Queue()
def func1():
    global thequeue
    while True:
        item = readstream()
        if item not None:
            thequeue.put(item)
def func2():
    while True:
        for item in thequeue:    # This only works for Lists, how to do this for queues?
            if item == "hi":
                print(item)
main():
    mp.Process(target=func1).start()
    mp.Process(target=func2).start()
bunbun
  • 2,595
  • 3
  • 34
  • 52
  • This is not a minimal working example https://stackoverflow.com/help/mcve. The question lacks a lot of detail, first being - how did you create threads in Python and what is the queue class you're using? – Victor Sergienko Jul 20 '18 at 01:39
  • Added more code @VictorSergienko – bunbun Jul 20 '18 at 01:47
  • 2
    This, I believe: https://stackoverflow.com/questions/21157739/how-to-iterate-through-a-python-queue-queue-with-a-for-loop-instead-of-a-while-l – Victor Sergienko Jul 20 '18 at 01:54

2 Answers2

13

If you want to write your code in terms of a for loop, you can use the two-argument form of iter:

def func2():
    for item in iter(thequeue.get, None):
        # do what you want

And to stop this process, you just need to put a None into thequeue, or you can make your own signal to stop if None is common in your case.

Note that unlike a normal for loop, this will remove items from the queue, just like calling get manually would. There is no way to iterate through an inter-process queue without removing items.

user2357112
  • 260,549
  • 28
  • 431
  • 505
Sraw
  • 18,892
  • 11
  • 54
  • 87
  • Note that this might not work if there are multiple processes pushing into the queue. it would be difficult to figure out which one of the producers must push in None. In such cases, @user2357112 's answer, though uglier, is easier to implement – gokul_uf Feb 04 '19 at 13:57
  • 3
    @gokul_uf You misunderstood. Join all producers in main thread and then put `None` in. Producers don't need to care about it. – Sraw Feb 04 '19 at 15:36
4

multiprocessing.Queue doesn't support iteration directly, because for looping over a container is expected to not modify the container. Such nondestructive iteration is both impossible to support in the multiprocessing.Queue implementation, and a fundamentally inappropriate operation for the use cases multiprocessing.Queue was designed for.

Consumers should use get, which retrieves and removes items from the queue:

def func2():
    while True:
        item = thequeue.get()
        if item == 'hi':
            print(item)

If you prefer the code structure of a for loop, you can use two-argument iter as shown in Sraw's answer, but you'll still remove items from the queue that way. It is not possible to iterate over a multiprocessing.Queue without removing items.

user2357112
  • 260,549
  • 28
  • 431
  • 505
  • How is it "fundamentally inappropriate"? As the other answer shows, it's trivial to build and use a generator for it. – smheidrich Sep 30 '20 at 04:15
  • @pmos: Iterating over a container with a standard `for` loop is nondestructive, but `multiprocessing.Queue` cannot support nondestructive iteration, and nondestructive iteration would run counter to the use of `multiprocessing.Queue` as a message-passing synchronization mechanism. – user2357112 Sep 30 '20 at 04:40
  • You can use `iter` to construct an iterator fed from the queue and then iterate over that, but you cannot iterate over a queue directly, and `multiprocessing.Queue` will almost certainly never support direct iteration. – user2357112 Sep 30 '20 at 04:44
  • Also, that's not a generator. Generators are a specific type of iterator, which is not the type that two-argument `iter` creates. – user2357112 Sep 30 '20 at 04:49
  • 1
    Fair enough on this not being a generator, but you say yourself that iteration with `for` is only nondestructive for *containers* - destructive iteration over non-containers with `for` is totally idiomatic and commonplace Python: e.g. iterating over a file object or standard stream (yielding lines if it's opened in text mode), generators, ... – smheidrich Sep 30 '20 at 17:02