18

I have create a simple publisher and a consumer which subscribes on the queue using basic.consume.

My consumer acknowledges the messages when the job runs without an exception. Whenever I run into an exception I don´t ack the message and return early. Only the acknowledged messages disappear from the queue, so that´s working correctly.
Now I want the consumer to pick up the failed messages again, but the only way to reconsume those messages is by restarting the consumer.

How do I need to approach this use case?

Setup code

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');

Consumer code

$queue->consume(array($this, 'callback'));

public function callback(AMQPEnvelope $msg)
{
    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return;
    }
    return $queue->ack($msg->getDeliveryTag());
}

Producer code

$exchange->publish('message');
pinepain
  • 12,453
  • 3
  • 60
  • 65
Bram Gerritsen
  • 7,178
  • 4
  • 35
  • 45

2 Answers2

27

If message was not acknowledged and application fails, it will be redelivered automatically and redelivered property on envelope will be set to true (unless you consume them with no-ack = true flag).

UPD:

You have to nack message with redelivery flag in your catch block

    try {
        //Do some business logic
    } catch (Exception $ex) {
        //Log exception
        return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
    }

Beware infinitely nacked messages while redelivery count doesn't implemented in RabbitMQ and in AMQP protocol at all.

If you doesn't want to mess with such messages and simply want to add some delay you may want to add some sleep() or usleep() before nack method call, but it is not a good idea at all.

There are multiple techniques to deal with cycle redeliver problem:

1. Rely on Dead Letter Exchanges

  • pros: reliable, standard, clear
  • cons: require additional logic

2. Use per message or per queue TTL

  • pros: easy to implement, also standard, clear
  • cons: with long queues you may loose some message

Examples (note, that for queue ttl we pass only number and for message ttl - anything that will be numeric string):

2.1 Per message ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'expiration' => '1000'
    )
);

2.2. Per queue ttl:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish('message at ' . microtime(true));

3. Hold redelivers count or left redelivers number (aka hop limit or ttl in IP stack) in message body or headers

  • pros: give you extra control on messages lifetime on application level
  • cons: significant overhead while you have to modify message and publish it again, application specific, not clear

Code:

$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');

$exchange->publish(
    'message at ' . microtime(true),
    null,
    AMQP_NOPARAM,
    array(
        'headers' => array(
            'ttl' => 100
        )
    )
);

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
        $headers = $msg->getHeaders();
        echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
        echo $msg->getDeliveryTag(), ' ';
        echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
        echo $msg->getBody(), PHP_EOL;

        try {
            //Do some business logic
            throw new Exception('business logic failed');
        } catch (Exception $ex) {
            //Log exception
            if (isset($headers['ttl'])) {
                // with ttl logic

                if ($headers['ttl'] > 0) {
                    $headers['ttl']--;

                    $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
                }

                return $queue->ack($msg->getDeliveryTag());
            } else {
                // without ttl logic
                return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
            }

        }

        return $queue->ack($msg->getDeliveryTag());
    }
);

There are may be some other ways to better control message redelivers flow.

Conclusion: there are no silver bullet solution. You have to decide what solution fit your need the best or find out something other, but don't forget to share it here ;)

pinepain
  • 12,453
  • 3
  • 60
  • 65
  • Thanks for your answer. `redelivered` is indeed set to `true`, but I have to restart my blocking consumer to reconsume the message. – Bram Gerritsen Jul 16 '13 at 08:39
  • Thanks, this is exactly what I needed. Could you give me some directions/suggestions how to prevent infinitely redelivered messages? It would be nice if I can delay the requeing to the queue by a given amount of second, so I don't overload my consuming server. – Bram Gerritsen Jul 16 '13 at 09:20
  • thanks for the chrystal clear update, this would be a very good resource for others struggeling with a similar issue. I already started implementing use a DLX and got it working now. It is behaving exactly as I want. – Bram Gerritsen Jul 16 '13 at 11:00
  • Dead Letter Exchange ;) – Bram Gerritsen Jul 16 '13 at 13:22
1

If you do not want to restart the consumer, then basic.recover AMQP command may be what you want. According to AMQP protocol:

basic.recover(bit requeue)

Redeliver unacknowledged messages.

This method asks the server to redeliver all unacknowledged messages on a specified channel. 
Zero or more messages may be redelivered. This method replaces the asynchronous Recover. 
mike
  • 1,734
  • 1
  • 23
  • 32
  • This method doesn't seem part of the client API I'm using. http://www.php.net/manual/en/book.amqp.php – Bram Gerritsen Jul 16 '13 at 08:53
  • 1
    RabbitMQ has partial support of this method, see [official doc on it](https://www.rabbitmq.com/specification.html#method-status-basic.recover) – pinepain Feb 13 '14 at 21:11