I have one queue with N
messages. Now I would like to create a console application (consumer), that create a given number of tasks/threads and each thread would take 1 message from queue, process it and take a new message... until queue is empty.
Up to this point, this is what I have:
private static void Main(string[] args)
{
var runner = new Runner();
for (var j = 0; j < 5; j++)
{
var task = Task.Factory.StartNew(() => { runner.ReceiveMessageFromServer(); });
}
}
and
public class QRegistrator : IDisposable
{
private const string activeMqUri = "activemq:tcp://localhost:61616";
private readonly IConnection _connection;
private readonly ISession _session;
public QRegistrator(/*string activeMqUri*/)
{
var connectionFactory = new ConnectionFactory(activeMqUri);
_connection = connectionFactory.CreateConnection();
_connection.Start();
_session = _connection.CreateSession();
}
public void Dispose()
{
_connection?.Close();
}
public IMessageConsumer CreateConsumer(string queueName, string topicName)
{
//there have to be new session for each concurrent consumer
var session = _connection.CreateSession();
var queue = new ActiveMQQueue(queueName);
var consumer = string.IsNullOrWhiteSpace(topicName) ? session.CreateConsumer(queue) : session.CreateConsumer(queue, $"topic = '{topicName}'");
return consumer;
}
}
and in class Runner
public void ReceiveMessageFromServer()
{
var consumer = _registrator.CreateConsumer("myQueue", null);
while (true)
{
ITextMessage message = consumer.Receive() as ITextMessage;
if (message == null)
{
break;
}
Console.WriteLine("Received message with ID: " + message.NMSMessageId);
DoSomething();
}
}
But when I run this code, sometimes it does not create a connection, sometimes it does not create a session etc. I really don’t understand. When I try without the for cycle (but still a task()) it behaves the same. When I try without task, and call only runner.ReceiveMessageFromServer()
it works OK. Can anybody please tell me what I am doing wrong?