1

I have one queue with N messages. Now I would like to create a console application (consumer), that create a given number of tasks/threads and each thread would take 1 message from queue, process it and take a new message... until queue is empty.

Up to this point, this is what I have:

private static void Main(string[] args)
{
    var runner = new Runner();
    for (var j = 0; j < 5; j++)
    {
        var task = Task.Factory.StartNew(() => { runner.ReceiveMessageFromServer(); });
    }
}

and

public class QRegistrator : IDisposable
{
    private const string activeMqUri = "activemq:tcp://localhost:61616";

    private readonly IConnection _connection;

    private readonly ISession _session;

    public QRegistrator(/*string activeMqUri*/)
    {
        var connectionFactory = new ConnectionFactory(activeMqUri);
        _connection = connectionFactory.CreateConnection();
        _connection.Start();
        _session = _connection.CreateSession();
    }

    public void Dispose()
    {
        _connection?.Close();
    }

    public IMessageConsumer CreateConsumer(string queueName, string topicName)
    {
        //there have to be new session for each concurrent consumer
        var session = _connection.CreateSession();
        var queue = new ActiveMQQueue(queueName);
        var consumer = string.IsNullOrWhiteSpace(topicName) ? session.CreateConsumer(queue) : session.CreateConsumer(queue, $"topic = '{topicName}'");
        return consumer;
    }
}

and in class Runner

public void ReceiveMessageFromServer()
{
    var consumer = _registrator.CreateConsumer("myQueue", null);

    while (true)
    {
        ITextMessage message = consumer.Receive() as ITextMessage;
        if (message == null)
        {
            break;
        }
        Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
        DoSomething();
    }
}

But when I run this code, sometimes it does not create a connection, sometimes it does not create a session etc. I really don’t understand. When I try without the for cycle (but still a task()) it behaves the same. When I try without task, and call only runner.ReceiveMessageFromServer() it works OK. Can anybody please tell me what I am doing wrong?

VMAtm
  • 27,943
  • 17
  • 79
  • 125
Patrik.Turcaj
  • 89
  • 1
  • 11
  • 2
    you have multiple threads sharing a `runner`, is runner thread safe? Also you don't wait for the threads to complete before ending the app. – Scott Chamberlain Apr 10 '17 at 19:38
  • 1
    I don't think you're going to get any performance increase from attempting parallel dequeues from the mq (and quite possibly run into trouble). I would dequeue using a single thread into a local, thread-safe queue, then process the messages in the local queue in parallel. – spender Apr 10 '17 at 19:45
  • @ScottChamberlain you are right ! What a silly mistake. I fixed it, but now, when I run program, only 1 task receive message from queue and other tasks are waiting untill this task is done, then next task receive message. What I have to do to make them run in parallel ? – Patrik.Turcaj Apr 10 '17 at 20:30
  • your problem is the prefetchPolicy – Hassen Bennour Apr 11 '17 at 12:54

2 Answers2

2

your problem is the prefetchPolicy.

persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)

all messages was dispatched to the first connected consumer and when another one connects he don't receive messages, so to change this behavior if you have concurrent consumer for a queue you need to set prefetchPolicy to a lower value than default. for example add this jms.prefetchPolicy.queuePrefetch=1 to the uri config in activemq.xml or set it on the client url like this

i dont know if it is possible with C# but try this

private const string activeMqUri = "activemq:tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1";

if it is not working you can set it on the broker side by adding this to activemq.xml file :

<destinationPolicy>
    <policyMap>
      <policyEntries>
        <!--  http://activemq.apache.org/per-destination-policies.html -->
        <policyEntry queue=">" queuePrefetch="1" > 
        </policyEntry>
      </policyEntries>
    </policyMap>
</destinationPolicy>

Large prefetch values are recommended for high performance with high message volumes. However, for lower message volumes, where each message takes a long time to process, the prefetch should be set to 1. This ensures that a consumer is only processing one message at a time. Specifying a prefetch limit of zero, however, will cause the consumer to poll for messages, one at a time, instead of the message being pushed to the consumer.

Take a look at http://activemq.apache.org/what-is-the-prefetch-limit-for.html

And

http://activemq.apache.org/destination-options.html

Hassen Bennour
  • 3,885
  • 2
  • 12
  • 20
1

Side notes: you do not wait for your tasks in Main method, you can do that with Task.WaitAll method, which will block the main thread until all the messages are delivered. Also, avoid the usage the StartNew method, it's better to use Task.Run in general.

Now, back to messages. According the current docs:

Concurrency Considerations for Consumers

Each IConnection instance is, in the current implementation, backed by a single background thread that reads from the socket and dispatches the resulting events to the application.

As of version 3.5.0 application callback handlers can invoke blocking operations (such as IModel.QueueDeclare or IModel.BasicCancel). IBasicConsumer callbacks are invoked concurrently.

So, according this question and answers there, you need to use other class for your consumer: EventingBasicConsumer, like this:

var channel = _connection.CreateModel();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
    {
        var body = ea.Body;
        // ... process the message
        channel.BasicAck(ea.DeliveryTag, false);
    };
String consumerTag = channel.BasicConsume(queueName, false, consumer);

In this case you'll got the message in event-based fashion without blocking the channel.

Other example can be found here or here. A great reference for messaging in .Net can be found here in Andras Nemes' blog.

Community
  • 1
  • 1
VMAtm
  • 27,943
  • 17
  • 79
  • 125