2

I created a small demo to show the RabbitMQ basics. Unfortunatly it doesn't work as expected an has two issues. I am using .NET Core 3.1 and RabbitMQ.Client 6.2.2

I created the Employee class which receives messages from the task queue. The first employee is working nice but if I start more employees they don't work (don't receive messages). And I can't figure out why that would be.

And if I have a lot of messages in the queue (before starting the second employee) I see that all messages in the tasks queue get ACKed when the second starts and then after a short time they become UNACKed again. Somehow weird.

But mainly: why do the other employees not work?

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace DemoTasks.Employee
{

  class Employee
  {
    static void Main(string[] args)
    {

      string clientName = "Employee-" + Guid.NewGuid().ToString();

      Console.Title = clientName;
      Console.WriteLine("Moin moin");

      IConnectionFactory connectionFactory = new ConnectionFactory
      {
        HostName            = "localhost",
        Port                = 5672,
        VirtualHost         = "/",
        UserName            = "user",
        Password            = "password",
        ClientProvidedName  = clientName
      };

      using (IConnection connection = connectionFactory.CreateConnection(clientName))
      {
        using (IModel model = connection.CreateModel())
        {

          model.ExchangeDeclare("jobs", "fanout", false, false, null);
          model.QueueDeclare("tasks", true, false, false);
          model.QueueBind("tasks", "jobs", "", null);

          EventingBasicConsumer consumer = new EventingBasicConsumer(model);
          consumer.Received += OnWorkReceived;

          model.BasicConsume("tasks", false, clientName + ":OnWorkReceived", consumer);

          Console.ReadLine();

          model.Close();
        }
        connection.Close();
      }

      Console.WriteLine("Wochenende ... woooh !!!");
    } 

    private static void OnWorkReceived(object sender, BasicDeliverEventArgs e)
    {

      EventingBasicConsumer consumer = (EventingBasicConsumer)sender;
      IModel model  = consumer.Model;

      string task = Encoding.UTF8.GetString(e.Body.ToArray());

      Console.Write("working on: " + task + " ... ");

      Thread.Sleep(5000);

      Console.WriteLine("done!");
              
      model.BasicAck(e.DeliveryTag, false);
    }

  } 

}
monty
  • 7,888
  • 16
  • 63
  • 100
  • 1
    See https://stackoverflow.com/questions/10620976/rabbitmq-amqp-single-queue-multiple-consumers-for-same-message or https://stackoverflow.com/questions/42351130/can-we-have-multiple-subscribers-for-rabbitmq-queue - either the question or an answer will probably give you some inspiration. Might be helpful to say what you want the behavior of the secondemployee to be - do all employees process the same message or do they share/roundrobin – Caius Jard Jun 24 '21 at 05:50

1 Answers1

2

I think your problem is about setting PrefetchCount on your channel. It's about how many messages that one consumer can get from rabbit and cache them on itself to process them.

If don't set it, one consumer can consume all messages on queue and no time to get messages by other consumers, so you can set it by using channel.basicQos(1) or basicqos(0,1,false). By this setting every consumer can get one message after send ack to rabbit then can get another one.

When set prefetch count to lower number, can affect on performance because your consumer must ask rabbit more to get messages .

For detail information see this: https://www.rabbitmq.com/consumer-prefetch.html

sa-es-ir
  • 3,722
  • 2
  • 13
  • 31