53

It seems the longer I keep my rabbitmq server running, the more trouble I have with unacknowledged messages. I would love to requeue them. In fact there seems to be an amqp command to do this, but it only applies to the channel that your connection is using. I built a little pika script to at least try it out, but I am either missing something or it cannot be done this way (how about with rabbitmqctl?)

import pika

credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
    credentials=credentials, virtual_host='***')

def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body

def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    

def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    

try:
    connection = pika.SelectConnection(parameters=parameters,\
        on_open_callback=on_connected)    

    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()
Will Olbrys
  • 957
  • 2
  • 9
  • 13
  • Have you been able to resolve this? – 13hsoj Aug 08 '13 at 10:44
  • https://stackoverflow.com/questions/8296201/when-does-an-amqp-rabbitmq-channel-with-no-connections-die SO answer has potentially what is needed depending on why you have other channels still hanging around with unacked messages. Zombie channels. Not dup, since this topic is about messages in other channels, and not the channels itself. – Gerard ONeill Jun 23 '17 at 23:15

3 Answers3

82

Unacknowledged messages are those which have been delivered across the network to a consumer but have not yet been ack'ed or rejected -- but that consumer hasn't yet closed the channel or connection over which it originally received them. Therefore the broker can't figure out if the consumer is just taking a long time to process those messages or if it has forgotten about them. So, it leaves them in an unacknowledged state until either the consumer dies or they get ack'ed or rejected.

Since those messages could still be validly processed in the future by the still-alive consumer that originally consumed them, you can't (to my knowledge) insert another consumer into the mix and try to make external decisions about them. You need to fix your consumers to make decisions about each message as they get processed rather than leaving old messages unacknowledged.

gdw2
  • 7,558
  • 4
  • 46
  • 49
Brian Kelly
  • 19,067
  • 4
  • 53
  • 55
  • 1
    so the basic.recover _must_ be called by the consumer? im using celeryd to manage connections. it might be possible to send that recover command to poorly responsive queues with celeryctl (if youre familiar with that...) – Will Olbrys Aug 15 '11 at 23:12
  • 4
    @will my condolences that you are using Celery. The celery developers simply do not understand AMQP and have created a badly broken implementation. You need to make a choice, either get rid of celery and do AMQP right, or stop using AMQP with celery and use something simple like Redis instead. I chose to drop celery and stay with AMQP. – Michael Dillon Aug 16 '11 at 01:53
  • 6
    thats quite an indictment. if you don't mind me asking, what about celery's AMQP implementation is not executed properly? – Will Olbrys Aug 16 '11 at 09:52
  • 8
    I agree, I'd like to hear why you think Celery is so badly broken. It's very widely used and this is the first time I've heard this complaint. – Jeremy Dunck Jan 19 '12 at 22:20
28

If messages are unacked there are only two ways to get them back into the queue:

  1. basic.nack

    This command will cause the message to be placed back into the queue and redelivered.

  2. Disconnect from the broker

    This action will force all unacked messages from this channel to be put back into the queue.

NOTE: basic.recover will try to republish unacked messages on the same channel (to the same consumer), which is sometimes the desired behaviour.

RabbitMQ spec for basic.recover and basic.nack


The real question is: Why are the messages unacknowledged?

Possible scenarios to cause unacked messages:

  1. Consumer fetching too many messages, then not processing and acking them quickly enough.

    Solution: Prefetch as few messages as appropriate.

  2. Buggy client library (I have this issue currently with pika 0.9.13. If the queue has a lot of messages, a certain number of messages will get stuck unacked, even hours later.

    Solution: I have to restart the consumer several times until all unacked messages are gone from the queue.

Josh Adams
  • 2,113
  • 2
  • 13
  • 25
IvanD
  • 7,971
  • 4
  • 35
  • 33
  • Has your issue with pika been reported? Can you provide a link? – istepaniuk Nov 21 '14 at 11:19
  • It's a python recursion limit that kicked in. Something about not being able to recurse >1000 times, which is what was apparently happening with pika 0.9.13. Not seeing it with 0.9.14. – IvanD Nov 23 '14 at 03:30
  • 3
    Finally found where the issue was reported: https://github.com/pika/pika/issues/286 – IvanD Nov 24 '14 at 22:30
  • https://www.rabbitmq.com/consumers.html#acknowledgement-timeout this setting is there in new version to avoid consumer timeout. – Rakesh Sharma Aug 20 '21 at 09:36
  • Another possible scenario: Glitch on internet connection when getting messages in RabbitMQ between them being sent and my computer acking them. – nsandersen Nov 22 '22 at 09:30
6

All the unacknowledged messages will go to ready state once all the workers/consumers are stopped.

Ensure all workers are stopped by confirming with a grep on ps aux output, and stopping/killing them if found.

If you are managing workers using supervisor, which shows as worker is stopped, you may want to check for zombies. Supervisor reports the worker to be stopped but still you will find zombie processes running when grepped on ps aux output. Killing the zombie processes will bring messages back to ready state.

Venkat Kotra
  • 10,413
  • 3
  • 49
  • 53
  • You can also determine if a rabbit connection is being held up by a zombie process, by using the RabbitMQ managment console, as I described here: http://stackoverflow.com/questions/11926077/rabbitmq-messages-remain-unacknowledged/43026774#43026774 – Shlomi Uziel Mar 26 '17 at 08:47