3

I am setting up a standard standalone thread listening to RabbitMQ in C#. Suppose the method for listening in the thread looks like this:

public void Listen()
{
    using (var channel = connection.CreateModel())
    {
        var consumer = SetupQueues(channel);
        while (true)
        {
            var ea = consumer.Queue.Dequeue();    // blocking call
            handler.HandleMessage(channel, ea);
        }
    }
}

What is an elegant way of halting consumption of messages gracefully in the C# client for RabbitMQ? Keep in mind I have found nothing of use in the RabbitMQ examples/docs or these SO questions:

The issue here is consumer.Queue.Dequeue() is a blocking call. I have tried these options:

  • Calling channel.BasicCancel(string tag). This causes a System.IO.EndOfStreamException in the blocking call. I do not want to use this exception as part of the control flow for obvious reasons.

  • Calling consumer.Queue.Dequeue(int millisecondsTimeout, out T result) and checking a flag in between loop iterations. This can work but seems hacky.

I want to let the thread exit gracefully and clean up any unmanaged resources I might have, so no thread aborting, etc.

Any help is appreciated. Thanks

Community
  • 1
  • 1
Ralphie
  • 135
  • 2
  • 7
  • 1
    As far as I'm aware there's no 'clean' way to do this. I use the timeout method personally - my consumer thread checks a flag and based on this decides what to do. This actually seems to work ok. Essentially, the application's state determines whether the queue should be consumed or not - and the consumer loop just checks whether it should consume or not before it attempts to. – HalliHax Sep 29 '14 at 17:17

2 Answers2

1

The DeQueue with the timeout & flag is the way to do it. It's a very common pattern, and is why many blocking calls are provided with timeout-enabled versions.

Alternately, throwing a (known) exception isn't necessarily a bad thing for control flow. Gracefully shutting down could mean actually trapping the exception, commenting "this is thrown when requesting the channel shuts down", and then returning cleanly. This is how part of TPL works with the CancellationToken.

Bryan Boettcher
  • 4,412
  • 1
  • 28
  • 49
  • Understood, thanks for the clarification. As far as using the exception goes, I would be happy to use it, however, the same exception (`System.IO.EndOfStreamException: SharedQueue closed`) is thrown when the 'SharedQueue` is closed by other means (network failure, heartbeat missed, etc.) – Ralphie Sep 30 '14 at 19:44
  • Yeah that's unfortunate. TPL does it the same way, they throw a ThreadAbortException, which can happen for a variety of reasons. I never understood why -- exceptions are cheap for a developer to create, why don't they use more OO? :( – Bryan Boettcher Sep 30 '14 at 20:42
1

The blocking methods are not property event-driven. I don't why they suggest to use consumer.Queue.Dequeue();

Anyway, I usually don't use consumer.Queue.Dequeue();

I extend the default consumer, in this way:

class MyConsumer : DefaultBasicConsumer {

  public MyConsumer(IModel model):base(model)
  {

  }
  public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) {
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);
  }
}


class Program
{
  static void Main(string[] args)
  {

    var factory = new ConnectionFactory() { Uri = "amqp://aa:bbb@lemur.cloudamqp.com/xxx" };
    using (var connection = factory.CreateConnection())
    {
      using (var channel = connection.CreateModel())
      {
      channel.QueueDeclare("hello", false, false, false, null);
      var consumer = new MyConsumer(channel);
      String tag = channel.BasicConsume("hello", true, consumer);
      Console.WriteLine(" [*] Waiting for messages." +
                               " any key to exit");
      Console.ReadLine();
      channel.BasicCancel(tag);


        /*while (true)
        {
          /////// DON'T USE THIS   
          var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
          var body = ea.Body;
          var message = Encoding.UTF8.GetString(body);
          Console.WriteLine(" [x] Received {0}", message);
        }*/
      }
    }


  }
}

In this way you don't have a blocking method, and you can release all the resources correctly.

EDIT

I think using ctrl+C to break a program is always wrong.

Nissa
  • 4,636
  • 8
  • 29
  • 37
Gabriele Santomaggio
  • 21,656
  • 4
  • 52
  • 52