5

I'm currently playing around with Rabbit-Mq, and am trying to implement a "dead-letter" queue, a queue for failed messages. I've been reading the rabbit documentation: https://www.rabbitmq.com/dlx.html.

and have come up with this example:

internal class Program
{
    private const string WorkerExchange = "work.exchange";
    private const string RetryExchange = "retry.exchange";
    public const string WorkerQueue = "work.queue";
    private const string RetryQueue = "retry.queue";

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory { HostName = "localhost" };

        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(WorkerExchange, "direct");
                channel.QueueDeclare
                (
                    WorkerQueue, true, false, false,
                    new Dictionary<string, object>
                    {
                        {"x-dead-letter-exchange", RetryExchange},

                        // I have tried with and without this next key
                        {"x-dead-letter-routing-key", RetryQueue}
                    }
                );
                channel.QueueBind(WorkerQueue, WorkerExchange, string.Empty, null);

                channel.ExchangeDeclare(RetryExchange, "direct");
                channel.QueueDeclare
                (
                    RetryQueue, true, false, false,
                    new Dictionary<string, object> {
                        { "x-dead-letter-exchange", WorkerExchange },
                        { "x-message-ttl", 30000 },
                    }
                );
                channel.QueueBind(RetryQueue, RetryExchange, string.Empty, null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);

                    Thread.Sleep(1000);
                    Console.WriteLine("Rejected message");

                    // also tried  channel.BasicNack(ea.DeliveryTag, false, false);
                    channel.BasicReject(ea.DeliveryTag, false);
                };

                channel.BasicConsume(WorkerQueue, false, consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

Image of queue when publishing to worker queue: rabbit-mq worker queue stats

Image of the retry queue: rabbit-mq retry stats

I feel as though I'm missing some small details but can't seem to find what they are.

Thanks in advance

Mark Davies
  • 1,447
  • 1
  • 15
  • 30

3 Answers3

15

You should define your dead-letter-exchange as fanout.

There we go: channel.ExchangeDeclare(RetryExchange, "fanout");

If your dead letter exchange is setup as DIRECT you must specify a dead letter routing key. If you just want all your NACKed message to go into a dead letter bucket for later investigation (as I do) then your dead letter exchange should be setup as a FANOUT.

Look at this for more info

Ashkan Nourzadeh
  • 1,922
  • 16
  • 32
  • Thank you! Didn't see that answer, was searching for quite a while – Mark Davies Feb 02 '18 at 14:14
  • 3
    I also figured out that the "x-dead-letter-routing-key" wasn't working because I didn't specify a routing key on the binding and was assuming it would route via queue name, which was a poor, unfounded assumption – Mark Davies Feb 02 '18 at 14:38
7

Turns out that if a dead letter exchange is a direct exchange then the queue parameters require a x-dead-letter-routing-key. Above (in the question) I am using this key in the dictionary to try and route my messages but what I am not doing is adding a route to my binding, here is an updated version of the code that works:

internal class Program
{
    private const string WorkerExchange = "work.exchange";
    private const string RetryExchange = "retry.exchange";
    public const string WorkerQueue = "work.queue";
    private const string RetryQueue = "retry.queue";

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory { HostName = "localhost" };

        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(WorkerExchange, "direct");
                channel.QueueDeclare
                (
                    WorkerQueue, true, false, false,
                    new Dictionary<string, object>
                    {
                        {"x-dead-letter-exchange", RetryExchange},
                        {"x-dead-letter-routing-key", RetryQueue}
                    }
                );
                channel.QueueBind(WorkerQueue, WorkerExchange, WorkerQueue, null);

                channel.ExchangeDeclare(RetryExchange, "direct");
                channel.QueueDeclare
                (
                    RetryQueue, true, false, false,
                    new Dictionary<string, object>
                    {
                        {"x-dead-letter-exchange", WorkerExchange},
                        {"x-dead-letter-routing-key", WorkerQueue},
                        {"x-message-ttl", 30000},
                    }
                );
                channel.QueueBind(RetryQueue, RetryExchange, RetryQueue, null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);

                    Thread.Sleep(1000);
                    Console.WriteLine("Rejected message");
                    channel.BasicNack(ea.DeliveryTag, false, false);
                };

                channel.BasicConsume(WorkerQueue, false, consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }

The difference being that the call to channel.QueueBind(WorkerQueue, WorkerExchange, WorkerQueue, null); now supplies the routing key to be the same as the queuename, so when the message "dead-letters" it gets routed to the exchange via this key

Mark Davies
  • 1,447
  • 1
  • 15
  • 30
0

The main thing to make it work is to bind WorkerExchange and WorkerQueue with the exact routing key. It should be configured via RabbitMq UI Manager for work.exchange.

Then add messages to the WorkerQueue via WorkerExchange using this routing key. Then these "dead queue" messages will be stored in retry.queue

channel.QueueDeclare
            (
                WorkerQueue, true, false, false,
                new Dictionary<string, object>
                {
                    {"x-dead-letter-exchange", RetryExchange}
                }
            );

This is what you need for queue declaration in your C# code.

Extra argument {"x-dead-letter-routing-key", RetryQueue} is not needed.

The following lines of code are not needed too:

channel.ExchangeDeclare(WorkerExchange, "direct");

channel.QueueBind(WorkerQueue, WorkerExchange, WorkerQueue, null);

channel.ExchangeDeclare(RetryExchange, "direct");
channel.QueueDeclare
            (
                RetryQueue, true, false, false,
                new Dictionary<string, object>
                {
                    {"x-dead-letter-exchange", WorkerExchange},
                    {"x-dead-letter-routing-key", WorkerQueue},
                    {"x-message-ttl", 30000},
                }
            );
channel.QueueBind(RetryQueue, RetryExchange, RetryQueue, null);

So you can remove the above lines.