15

We have a windows service which listen to single RabbitMQ queue and process the message.

We would like to extend same windows services so that it can listen to multiple queues of RabbitMQ and process the message.

Not sure if that can be possible by using multi-threading as each thread will have to listening (blocking) the queue.

As I am very new to multi-threading, need high level guideline on following point, which will help me to start building the prototype.

  1. Is it possible to listen multiple queues in single application by using threading?
  2. How to handle the situation where if any single thread got shut down (due to exception etc.), how to bring back without restarting the whole windows services.
  3. Any design pattern or open source implementation which can help me to handle this kind of situation.
Aniruddh Parihar
  • 3,072
  • 3
  • 21
  • 39
Mahesh
  • 1,754
  • 7
  • 36
  • 54
  • Possibly a duplicate: [How to implement single-consumer-multi-queue model for rabbitMQ](http://stackoverflow.com/q/11357512/1768303) – noseratio Feb 18 '14 at 02:21
  • @Noseratio - no I am not asking about single consumer multiple queue. There will be multiple queue with multiple queues, but implementation should be done withing single windows services. So instead of writing multiple windows services for each queue consumer, i would like to write single windows services which will listen multiple queues and process the message. – Mahesh Feb 18 '14 at 02:39
  • I concur, this does not seem to be a duplicate. – theMayer Feb 18 '14 at 03:03
  • @Mahesh, are you asking how to aggregate messages from multiple queues into a single queue, for consuming centrally? – noseratio Feb 18 '14 at 04:25
  • 1
    @Noseratio Sorry, if my writing is not very clear. Single windows service which **listen** to multiple queues on different channel. In other words, single application which acts like multiple consumer (each consumer will get message from its queues based upon the route key binding). If i have to write multiple windows service for each consumer will not be a challenging one. Due to limited experience in threading, I am not able think how I can achieve within single application by using threads. I hope this comment will make sense. Let me if its still confusing, i'll try to improve it further. – Mahesh Feb 18 '14 at 06:21
  • 1
    EasyNetQ (http://easynetq.com) is a pretty complete open source high-level API for RabbitMQ that does thread management, connection handling, error handling etc out-of-the-box. – Mike Hadlow Feb 18 '14 at 09:47
  • If you have any ability to configure the RabbitMQ routing setup (which you probably do) it is probably more easily done there. Create a new queue, and give it all of the bindings of all the queues that you want to listen to (ie, the union of their bindings). Then the new queue will get all the messages that the other queues do, and you can just listen to the one queue without the mess of multithreading. This is why RabbitMQ routing exists. – Brian Reischl Dec 11 '14 at 17:42

2 Answers2

12

I like how you wrote your question - it started out very broad and focused in to specifics. I have successfully implemented something very similar, and am currently working on an open-source project to take my lessons learned and give them back to the community. Unfortunately, though- I have yet to package my code up neatly, which doesn't help you much! Anyway, to answer your questions:

1. Is it possible to use threading for multiple queues.

A: Yes, but it can be full of pitfalls. Namely, the RabbitMQ .NET library is not the best-written piece of code out there, and I have found it to be a relatively cumbersome implementation of the AMQP protocol. One of the most pernicious caveats is how it handles the "receiving" or "consuming" behavior, which can cause deadlocks quite easily if you aren't careful. Fortunately, it is well-illustrated in the API documentation. Advice - if you can, use a singleton connection object. Then, in each thread, use the connection to create a new IModel and corresponding consumers.

2. How to gracefully handle exceptions in threads - I believe this is another topic and I will not address it here as there are several methods you can use.

3. Any open-source projects? - I liked the thinking behind EasyNetQ, although I ended up rolling my own anyway. I'll hopefully remember to follow back when my open source project is completed, as I believe it is an even better improvement than EasyNetQ.

theMayer
  • 15,456
  • 7
  • 58
  • 90
10

You may find this answer very helpful. I have a very basic understanding of how RabbitMQ works, but I'd probably go on with one subscriber per channel per thread, as suggested there.

There is certainly more than one option to organize the threading model for this. The actual implementation will depend on how you need to process messages from multiple queues: either in parallel, or by aggregating them and serializing the processing. The following code is a console app which implements a simulation of the latter case. It uses the Task Parallel Library and the BlockingCollection class (which comes very handy for this kind of task).

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21842880
{
    class Program
    {
        BlockingCollection<object> _commonQueue;

        // process an individual queue
        void ProcessQueue(int id, BlockingCollection<object> queue, CancellationToken token)
        {
            while (true)
            {
                // observe cancellation
                token.ThrowIfCancellationRequested();
                // get a message, this blocks and waits
                var message = queue.Take(token);

                // process this message
                // just place it to the common queue
                var wrapperMessage = "queue " + id + ", message: " + message;
                _commonQueue.Add(wrapperMessage);
            }
        }

        // process the common aggregated queue
        void ProcessCommonQeueue(CancellationToken token)
        {
            while (true)
            {
                // observe cancellation
                token.ThrowIfCancellationRequested();
                // this blocks and waits

                // get a message, this blocks and waits
                var message = _commonQueue.Take(token);

                // process this message
                Console.WriteLine(message.ToString());
            }
        }

        // run the whole process
        async Task RunAsync(CancellationToken token)
        {
            var queues = new List<BlockingCollection<object>>();
            _commonQueue = new BlockingCollection<object>();

            // start individual queue processors
            var tasks = Enumerable.Range(0, 4).Select((i) =>
            {
                var queue = new BlockingCollection<object>();
                queues.Add(queue);

                return Task.Factory.StartNew(
                    () => ProcessQeueue(i, queue, token), 
                    TaskCreationOptions.LongRunning);
            }).ToList();

            // start the common queue processor
            tasks.Add(Task.Factory.StartNew(
                () => ProcessCommonQeueue(token),
                TaskCreationOptions.LongRunning));

            // start the simulators
            tasks.AddRange(Enumerable.Range(0, 4).Select((i) => 
                SimulateMessagesAsync(queues, token)));

            // wait for all started tasks to complete
            await Task.WhenAll(tasks);
        }

        // simulate a message source
        async Task SimulateMessagesAsync(List<BlockingCollection<object>> queues, CancellationToken token)
        {
            var random = new Random(Environment.TickCount);
            while (true)
            {
                token.ThrowIfCancellationRequested();
                await Task.Delay(random.Next(100, 1000));
                var queue = queues[random.Next(0, queues.Count)];
                var message = Guid.NewGuid().ToString() + " " +  DateTime.Now.ToString();
                queue.Add(message);
            }
        }

        // entry point
        static void Main(string[] args)
        {
            Console.WriteLine("Ctrl+C to stop...");

            var cts = new CancellationTokenSource();
            Console.CancelKeyPress += (s, e) =>
            {
                // cancel upon Ctrl+C
                e.Cancel = true;
                cts.Cancel();
            };

            try
            {
                new Program().RunAsync(cts.Token).Wait();
            }
            catch (Exception ex)
            {
                if (ex is AggregateException)
                    ex = ex.InnerException;
                Console.WriteLine(ex.Message);
            }

            Console.WriteLine("Press Enter to exit");
            Console.ReadLine();
        }
    }
}

Another idea may be to use Reactive Extensions (Rx). If you can think of the arriving messages as of events, and Rx can help aggregating them into single stream.

Community
  • 1
  • 1
noseratio
  • 59,932
  • 34
  • 208
  • 486
  • Based upon your implementation I have created an implementation. It is not using the BlockingCollection collection as message receiving, message processing and acknowledge of the message to broker has to done on same channel. https://gist.github.com/mahesh-singh/9214295 not sure if this implementation is a right implementation. – Mahesh Feb 25 '14 at 18:06
  • setup new code review http://codereview.stackexchange.com/questions/42836/listen-to-multiple-rabbitmq-queue-by-task-and-process-the-message – Mahesh Feb 26 '14 at 08:33
  • @Mahesh, you should probably link the original SO question there, to help other people who may be reviewing. – noseratio Feb 26 '14 at 08:35