12

How can I go about "selecting" on multiple queue.Queue's simultaneously?

Golang has the desired feature with its channels:

select {
case i1 = <-c1:
    print("received ", i1, " from c1\n")
case c2 <- i2:
    print("sent ", i2, " to c2\n")
case i3, ok := (<-c3):  // same as: i3, ok := <-c3
    if ok {
        print("received ", i3, " from c3\n")
    } else {
        print("c3 is closed\n")
    }
default:
    print("no communication\n")
}

Wherein the first channel to unblock executes the corresponding block. How would I achieve this in Python?

Update0

Per the link given in tux21b's answer, the desired queue type has the following properties:

  • Multi-producer/multi-consumer queues (MPMC)
  • provides per-producer FIFO/LIFO
  • When a queue is empty/full consumers/producers get blocked

Furthermore channels can be blocking, producers will block until a consumer retrieves the item. I'm not sure that Python's Queue can do this.

Community
  • 1
  • 1
Matt Joiner
  • 112,946
  • 110
  • 377
  • 526
  • To make sure I understand: you have more than one `Queue`, you want to use `Queue.get()` on each one (which is a blocking call), and the first one to unblock is the one you want to run with? Can you tell us why you don't just use one `Queue`? – Ethan Furman Dec 14 '11 at 22:33
  • @EthanFurman: That's right. One queue would require decorating messages with some kind of ID. It also means that different threads could not wait for different messages. – Matt Joiner Dec 14 '11 at 22:58
  • 1
    Looks like you have three good answers -- are any of them helpful? If not, can you give some more details, maybe some example Python usage? – Ethan Furman Dec 20 '11 at 16:29

4 Answers4

3

If you use queue.PriorityQueue you can get a similar behaviour using the channel objects as priorities:

import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager

logging.basicConfig(level=logging.NOTSET,
                    format="%(threadName)s - %(message)s")

class ChannelManager(object):
    next_priority = 0

    def __init__(self):
        self.queue = PriorityQueue()
        self.channels = []

    def put(self, channel, item, *args, **kwargs):
        self.queue.put((channel, item), *args, **kwargs)

    def get(self, *args, **kwargs):
        return self.queue.get(*args, **kwargs)

    @contextmanager
    def select(self, ordering=None, default=False):
        if default:
            try:
                channel, item = self.get(block=False)
            except Empty:
                channel = 'default'
                item = None
        else:
            channel, item = self.get()
        yield channel, item


    def new_channel(self, name):
        channel = Channel(name, self.next_priority, self)
        self.channels.append(channel)
        self.next_priority += 1
        return channel


class Channel(object):
    def __init__(self, name, priority, manager):
        self.name = name
        self.priority = priority
        self.manager = manager

    def __str__(self):
        return self.name

    def __lt__(self, other):
        return self.priority < other.priority

    def put(self, item):
        self.manager.put(self, item)


if __name__ == '__main__':
    num_channels = 3
    num_producers = 4
    num_items_per_producer = 2
    num_consumers = 3
    num_items_per_consumer = 3

    manager = ChannelManager()
    channels = [manager.new_channel('Channel#{0}'.format(i))
                for i in range(num_channels)]

    def producer_target():
        for i in range(num_items_per_producer):
            time.sleep(random.random())
            channel = random.choice(channels)
            message = random.choice(string.ascii_letters)
            logging.info('Putting {0} in {1}'.format(message, channel))
            channel.put(message)

    producers = [threading.Thread(target=producer_target,
                                  name='Producer#{0}'.format(i))
                 for i in range(num_producers)]
    for producer in producers:
        producer.start()
    for producer in producers:
        producer.join()
    logging.info('Producers finished')

    def consumer_target():
        for i in range(num_items_per_consumer):
            time.sleep(random.random())
            with manager.select(default=True) as (channel, item):
                if channel:
                    logging.info('Received {0} from {1}'.format(item, channel))
                else:
                    logging.info('No data received')

    consumers = [threading.Thread(target=consumer_target,
                                  name='Consumer#{0}'.format(i))
                 for i in range(num_consumers)]
    for consumer in consumers:
        consumer.start()
    for consumer in consumers:
        consumer.join()
    logging.info('Consumers finished')

Example output:

Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished

In this example, ChannelManager is just a wrapper around queue.PriorityQueue that implements the select method as a contextmanager to make it look similar to the select statement in Go.

A few things to note:

  • Ordering

    • In the Go example, the order in which the channels are written inside the select statement determines which channel's code will be executed if there's data available for more than one channel.

    • In the python example the order is determined by the priority assigned to each channel. However, the priority can be dinamically assigned to each channel (as seen in the example), so changing the ordering would be possible with a more complex select method that takes care of assigning new priorities based on an argument to the method. Also, the old ordering could be reestablished once the context manager is finished.

  • Blocking

    • In the Go example, the select statement is blocking if a default case exists.

    • In the python example, a boolean argument has to be passed to the select method to make it clear when blocking/non-blocking is desired. In the non-blocking case, the channel returned by the context mananager is just the string 'default' so it's easy in the code inside to detect this in the code inside the with statement.

  • Threading: Object in the queue module are already ready for multi-producer, multiconsumer-scenarios as already seen in the example.

jcollado
  • 39,419
  • 8
  • 102
  • 133
  • I don't think this answers the question -- you have loaded all three responses into the queue, but in his program the data would arrive asynchronously. If C2 gets there before C1, then C2 should be returned first, and it should be returned as soon as it is available. – Ethan Furman Dec 14 '11 at 21:28
  • @EthanFurman My understanding is that `q.get` blocks until an element is available and then returns it. Hence, if C2 is there before C1, then C2 will be returned and, after that, C1. The example goal was to show that the top-down evaluation in the `select` statement in go is actually equivalent to using priorities. Instead of priorities, timestamps can be used, but that's exactly the same as using a single queue, so maybe I'm missing something. – jcollado Dec 14 '11 at 21:53
  • Wow -- nice rewrite. I think you are answering the question now, but I don't know 'go' so can't say for sure. – Ethan Furman Dec 20 '11 at 03:52
  • I don't think this implementation allows for multiple consumers. Please advise. – Matt Joiner Dec 20 '11 at 23:39
  • @MattJoiner There shouldn't be any problem with multiple producers/consumers since the locking mechanism is already provided by the `queue` objects. I've reworked the example to highlight this. – jcollado Dec 21 '11 at 00:58
  • An impressive effort but I'm not communicating an important issue here. All your consumers are consuming from the same MultiQueue. It's a requirement that consumers can pick and choose which channels to get from. The producers also cannot be blocked until a receiver appears. There is no synchronous put. – Matt Joiner Dec 21 '11 at 02:43
  • If you want to select from different sets of channels, then look at my polling example. Adding exponential back-off times is quite easy, and that might already scale well. Depending on your use-cases, it might even be faster than blocking completely. – tux21b Dec 21 '11 at 09:19
2

The pychan project duplicates Go channels in Python, including multiplexing. It implements the same algorithm as Go, so it meets all of your desired properties:

  • Multiple producers and consumers can communicate through a Chan. When both a producer and consumer are ready, the pair of them will block
  • Producers and consumers are serviced in the order they arrived (FIFO)
  • An empty (full) queue will block consumers (producers).

Here's what your example would look like:

c1 = Chan(); c2 = Chan(); c3 = Chan()

try:
    chan, value = chanselect([c1, c3], [(c2, i2)])
    if chan == c1:
        print("Received %r from c1" % value)
    elif chan == c2:
        print("Sent %r to c2" % i2)
    else:  # c3
        print("Received %r from c3" % value)
except ChanClosed as ex:
    if ex.which == c3:
        print("c3 is closed")
    else:
        raise

(Full disclosure: I wrote this library)

Stu Gla
  • 1,129
  • 12
  • 16
2

There are many different implementations of producer-consumer queues, like queue.Queue available. They normally differ in a lot of properties like listed on this excellent article by Dmitry Vyukov. As you can see, there are more than 10k different combinations possible. The algorithms used for such queues also differ widely depending on the requirements. It's not possible to just extend an existing queue algorithm to guarantee additional properties, since that normally requires different internal data structures and different algorithms.

Go's channels offer a relatively high number of guaranteed properties, so those channels might be suitable for a lot of programs. One of the hardest requirements there is the support for reading / blocking on multiple channels at once (select statement) and to choose a channel fairly if more than one branch in a select statement is able to proceed, so that no messages will be left behind. Python's queue.Queue doesn't offer this features, so it's simply not possible to archive the same behavior with it.

So, if you want to continue using queue.Queue you need to find workarounds for that problem. The workarounds have however their own list of drawbacks and are harder to maintain. Looking for another producer-consumer queue which offers the features you need might be a better idea! Anyway, here are two possible workarounds:

Polling

while True:
  try:
    i1 = c1.get_nowait()
    print "received %s from c1" % i1
  except queue.Empty:
    pass
  try:
    i2 = c2.get_nowait()
    print "received %s from c2" % i2
  except queue.Empty:
    pass
  time.sleep(0.1)

This might use a lot of CPU cycles while polling the channels and might be slow when there are a lot of messages. Using time.sleep() with an exponential back-off time (instead of the constant 0.1 secs shown here) might improve this version drastically.

A single notify-queue

queue_id = notify.get()
if queue_id == 1:
  i1 = c1.get()
  print "received %s from c1" % i1
elif queue_id == 2:
  i2 = c2.get()
  print "received %s from c2" % i2

With this setup, you must send something to the notify queue after sending to c1 or c2. This might work for you, as long as only one such notify-queue is enough for you (i.e. you do not have multiple "selects", each blocking on a different subset of your channels).

Alternatively you can also consider using Go. Go's goroutines and concurrency support is much more powerful than Python's limited threading capabilities anyway.

tux21b
  • 90,183
  • 16
  • 117
  • 101
  • +1 for the article link. The rest of your answer is speculation. – Matt Joiner Dec 20 '11 at 23:42
  • Hi Matt, could you be more precise? I would like to clarify the remaining points. – tux21b Dec 20 '11 at 23:50
  • Go's channels implementation maintains two linked list of receiving and sending goroutines for each channel containing all goroutines which are currently blocked on that channel. The select statement uses those lists to enqueue the current goroutine on all of these channels in order to get notified later when one or more channels are ready. Python's queue.Queue implementation on the other hand just uses a single flag for locking. It doesn't record anything else. So, it's definitely not possible with queue.Queue, and that is no speculation at all ;) – tux21b Dec 21 '11 at 00:36
1
from queue import Queue

# these imports needed for example code
from threading import Thread
from time import sleep
from random import randint

class MultiQueue(Queue):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queues = []

    def addQueue(self, queue):
        queue.put = self._put_notify(queue, queue.put)
        queue.put_nowait = self._put_notify(queue, queue.put_nowait)
        self.queues.append(queue)

    def _put_notify(self, queue, old_put):
        def wrapper(*args, **kwargs):
            result = old_put(*args, **kwargs)
            self.put(queue)
            return result
        return wrapper

if __name__ == '__main__':
    # an example of MultiQueue usage

    q1 = Queue()
    q1.name = 'q1'
    q2 = Queue()
    q2.name = 'q2'
    q3 = Queue()
    q3.name = 'q3'

    mq = MultiQueue()
    mq.addQueue(q1)
    mq.addQueue(q2)
    mq.addQueue(q3)

    queues = [q1, q2, q3]
    for i in range(9):
        def message(i=i):
            print("thread-%d starting..." % i)
            sleep(randint(1, 9))
            q = queues[i%3]
            q.put('thread-%d ending...' % i)
        Thread(target=message).start()

    print('awaiting results...')
    for _ in range(9):
        result = mq.get()
        print(result.name)
        print(result.get())

Rather than try to use the .get() method of several queues, the idea here is to have the queues notify the MultiQueue when they have data ready -- sort of a select in reverse. This is achieved by having MultiQueue wrap the various Queue's put() and put_nowait() methods so that when something is added to those queues, that queue is then put() into the the MultiQueue, and a corresponding MultiQueue.get() will retrieve the Queue that has data ready.

This MultiQueue is based on the FIFO Queue, but you could also use the LIFO or Priority queues as the base depending on your needs.

Ethan Furman
  • 63,992
  • 20
  • 159
  • 237
  • I don't believe your MultiQueue allows for blocking producers if there is no receiver. – Matt Joiner Dec 20 '11 at 23:54
  • @MattJoiner: I'm not sure what you mean -- once something is put on one of the managed queues, that queue will put itself on the MultiQueue, and there it will sit until some other process asks for the next available queue from the MultiQueue. – Ethan Furman Dec 21 '11 at 00:22
  • 1
    There are 2 issues with this: Firstly there needs to be a way to have multiple MultiQueue's, each with different channels attached, only removing messages from teh channels when something calls MultiQueue.get. Your solution will not distribute the messages correctly. Secondly, a producer needs to be able to block until someone receives on that channel. This is also something the default Queue can't do at all. – Matt Joiner Dec 21 '11 at 00:35