21

I've got a python worker client that spins up a 10 workers which each hook onto a RabbitMQ queue. A bit like this:

#!/usr/bin/python
worker_count=10

def mqworker(queue, configurer):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
    channel = connection.channel()
    channel.queue_declare(queue=qname, durable=True)
    channel.basic_consume(callback,queue=qname,no_ack=False)
    channel.basic_qos(prefetch_count=1)
    channel.start_consuming()


def callback(ch, method, properties, body):
    doSomeWork();
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == '__main__':
    for i in range(worker_count):
        worker = multiprocessing.Process(target=mqworker)
        worker.start()

The issue I have is that despite setting basic_qos on the channel, the first worker to start accepts all the messages off the queue, whilst the others sit there idle. I can see this in the rabbitmq interface, that even when I set worker_count to be 1 and dump 50 messages on the queue, all 50 go into the 'unacknowledged' bucket, whereas I'd expect 1 to become unacknowledged and the other 49 to be ready.

Why isn't this working?

growse
  • 3,554
  • 9
  • 43
  • 66

1 Answers1

31

I appear to have solved this by moving where basic_qos is called.

Placing it just after channel = connection.channel() appears to alter the behaviour to what I'd expect.

growse
  • 3,554
  • 9
  • 43
  • 66
  • 1
    thank you! that did solve the issue. and btw this is very hard to debug.. – Sajuuk May 08 '17 at 06:06
  • 1
    @Hiagara yeah just ran into this today myself. Amazing that almost 5 years later this is still not clear or documented in the API. – Jordan May 08 '17 at 21:57
  • 11
    I think that we should to declarate `basic_qos` before `basic_consume`. Because basic_consume use this setting when initialized. – rborodinov May 24 '18 at 06:43
  • 2
    agreed with @rborodinov. I had `basic_qos` right after `basic_consume` and it didn't work. Switched them, now it works fine. – Highstaker Jun 27 '18 at 09:36
  • I also had to set `auto_ack=False` when setting up the `basic_consume` for it to work. Otherwise it still consumed more messages than expected. – Tobias Jan 09 '20 at 22:01
  • My `.ack()` was in the loop inside the callback, so it was trying to call it more than once for every `delivery_tag` thus resulting in a RabbitMQ 406 PRECONDITION_FAILED - unknown delivery tag. – jkulak Feb 13 '22 at 19:35