11

Solutions in RabbitMQ Wait for a message with a timeout and Wait for a single RabbitMQ message with a timeout don't seem to work because there is no next delivery method in official C# library and QueueingBasicConsumer is depricated, so it just throws NotSupportedException everywhere.

How I can wait for single message from queue for specified timeout?

PS

It can be done through Basic.Get(), yes, but well, it is bad solution to pull messages in specififed interval (excess traffic, excess CPU).

Update

EventingBasicConsumer by implmenetation NOT SUPPORT immediate cancelation. Even if you call BasicCancel at some point, even if you specify prefetch through BasicQos - it will still fetch in Frames and those frames can contain multiple messages. So, it is not good for single task execution. Don't bother - it just don't work with single messages.

eocron
  • 6,885
  • 1
  • 21
  • 50

1 Answers1

10

There are many ways to do this. For example you can use EventingBasicConsumer together with ManualResetEvent, like this (that's just for demonstration purposes - better use one of the methods below):

var factory = new ConnectionFactory();
using (var connection = factory.CreateConnection()) {
    using (var channel = connection.CreateModel()) {
        // setup signal
        using (var signal = new ManualResetEvent(false)) {
            var consumer = new EventingBasicConsumer(channel);
            byte[] messageBody = null;                        
            consumer.Received += (sender, args) => {
                messageBody = args.Body;
                // process your message or store for later
                // set signal
                signal.Set();
            };               
            // start consuming
            channel.BasicConsume("your.queue", false, consumer);
            // wait until message is received or timeout reached
            bool timeout = !signal.WaitOne(TimeSpan.FromSeconds(10));
            // cancel subscription
            channel.BasicCancel(consumer.ConsumerTag);
            if (timeout) {
                // timeout reached - do what you need in this case
                throw new Exception("timeout");
            }

            // at this point messageBody is received
        }
    }
}

As you stated in comments - if you expect multiple messages on the same queue, it's not the best way. Well it's not the best way in any case, I included it just to demonstrate the use of ManualResetEvent in case library itself does not provide timeouts support.

If you are doing RPC (remote procedure call, request-reply) - you can use SimpleRpcClient together with SimpleRpcServer on server side. Client side will look like this:

var client = new SimpleRpcClient(channel, "your.queue");
client.TimeoutMilliseconds = 10 * 1000;
client.TimedOut += (sender, args) => {
    // do something on timeout
};                    
var reply = client.Call(myMessage); // will return reply or null if timeout reached

Even more simple way: use basic Subscription class (it uses the same EventingBasicConsumer internally, but supports timeouts so you don't need to implement yourself), like this:

var sub = new Subscription(channel, "your.queue");
BasicDeliverEventArgs reply;
if (!sub.Next(10 * 1000, out reply)) {
     // timeout
}
Evk
  • 98,527
  • 8
  • 141
  • 191
  • First solution is invalid. BasicConsume not guaranteed to stop consumption on BasicCancel, it can do this a bit later due to on the fly implementation of rabbit (just try to consume one single message per request and you will see that in some cases you assign messageBody few times). You will still need to requeue excess messages. For second and third one, I will try out now =) – eocron May 31 '17 at 11:12
  • Though your upper part is irrelevant, Subscription class is exactly what I wanted! Thank you, it works marvelous! Can you edit answer so others will know that last one worked out? – eocron May 31 '17 at 15:33
  • Still, it stores a bunch of messages inside by implementation, whereas I need only one =/ – eocron Jun 01 '17 at 09:51
  • And why do you need to fetch with timeout? Sent request and waiting for reply, or for some another reason? – Evk Jun 01 '17 at 10:54
  • Can you use `basicQos` to limit prefetch count to 1, then disable auto-ack and ack your messages manually? Then you will get messages one by one, and rabbit will not push more messages to your channel unlit you ack current one (I think). – Evk Jun 01 '17 at 11:02
  • I wait for 1 message because I can process only one at a time AND to let other processes to do some middle job (they share same lock, so there is no need to fetch message when Im not able to process them). BasicQos is 1. I tried to ack manually and it seems to work well...still testing. – eocron Jun 01 '17 at 14:28
  • No, it doesn't work. He prefetching all messages inside single Frame and makes them wait for ack... – eocron Jun 01 '17 at 15:17
  • So even if prefetch count = 1 it still prefetches more than one? – Evk Jun 01 '17 at 15:18
  • Yeap. It is (well, it seems like a major bug). So there is no way around EventingBasicConsumer. – eocron Jun 01 '17 at 15:19