0

I run the producer, it generates N messages, i see them on the dashboard. When I run a receiver it receive all messages from the queue and the queue is an empty.

    static void Receive(string QueName)
    {
        ConnectionFactory connectionFactory = new ConnectionFactory
        {
            HostName = HostName,
            UserName = UserName,
            Password = Password,
        };
        var connection = connectionFactory.CreateConnection();
        var channel = connection.CreateModel();
        channel.BasicQos(0, 1, false);
        MessageReceiver messageReceiver = new MessageReceiver(channel);
        channel.BasicConsume(QueName, false, messageReceiver);
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

// Receiver
 public class MessageReceiver : DefaultBasicConsumer
    {
        private readonly IModel _channel;
        public MessageReceiver(IModel channel)
        {
            _channel = channel;
        }
        public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
        {
            Console.WriteLine($"------------------------------");
            Console.WriteLine($"Consuming Message");
            Console.WriteLine(string.Concat("Message received from the exchange ", exchange));
            Console.WriteLine(string.Concat("Consumer tag: ", consumerTag));
            Console.WriteLine(string.Concat("Delivery tag: ", deliveryTag));
            Console.WriteLine(string.Concat("Routing tag: ", routingKey));
            //Console.WriteLine(string.Concat("Message: ", Encoding.UTF8.GetString(body)));

            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine(string.Concat("Message: ", message));
            Console.WriteLine($"------------------------------");
            _channel.BasicAck(deliveryTag, false);
        }
    }

I need to have multiple producers which generate messages to the same queue. And multiple customers receive messages from the queue. And messages will be deleted by queue TTL. But now the 1st receiver gets all messages from the queue. How can I do this?

ZedZip
  • 5,794
  • 15
  • 66
  • 119
  • Does this answer your question? [RabbitMQ / AMQP: single queue, multiple consumers for same message?](https://stackoverflow.com/questions/10620976/rabbitmq-amqp-single-queue-multiple-consumers-for-same-message) – possum Nov 29 '21 at 13:46
  • Alas, I did not find answer. I'd like to find a simple receipt how to organize it. – ZedZip Nov 29 '21 at 14:14
  • I have edited my start post - added the Receiver code. – ZedZip Nov 29 '21 at 14:31

2 Answers2

1

The best solution is : every client should have its own queue, may be with TTL, may be with expiration parameter.

ZedZip
  • 5,794
  • 15
  • 66
  • 119
0

We use “exchange” here, just to show the exchange mechanics in same sample, it’s not really needed for the task (check Worker2 project, it works with another queue, which is binded to the same exchange):

channel.ExchangeDeclare(exchange: “logs”, type: ExchangeType.Fanout);

Full sample of consumption

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Timers;

class Worker
{
    public static void Main()    
    {
        // Test of timer handler
        System.Timers.TimeraTimer = new System.Timers.Timer();
        aTimer.Elapsed += new ElapsedEventHandler((source, e) 
                                => Console.Write("Timer Test"));
        aTimer.Interval=3000;
        // Test timer
        // aTimer.Enabled = true;
        
        var factory = new ConnectionFactory()        
        {
            HostName = "localhost", UserName="user", Password="password",
            // DispatchConsumersAsync = true        
        };
        var connection = factory.CreateConnection();
        
        // Add multiple consumers, so that queue can be processed "in
        // parallel"
        for (int i=1; i<10; i++)        
        {
            var j=i;
            var channel = connection.CreateModel();
            
            channel.ExchangeDeclare(exchange: "logs", type: 
                                    ExchangeType.Fanout);
            var queueName=channel.QueueDeclare("test1", durable: true, 
                            autoDelete: false, exclusive: false); 
            
            // take 1 message per consumer
            channel.BasicQos(0, 1, false);
            
            channel.QueueBind(queue: queueName,
                    exchange: "logs",
                    routingKey: "");
            Console.WriteLine($" [*] Waiting for messages in {j}");
            
            var consumer = new EventingBasicConsumer(channel);
            
            consumer. Received+= (model, ea) =>            
            {
                byte[] body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine($" [x] Received in {j} -> {message} at 
                                                    {DateTime.Now}");
                
                // Thread.Sleep(dots * 1000);
                
                // await Task.Delay(3000);
                Thread.Sleep(10000); 
                // async works too
                
                if (j==5)                
                {
                    // Test special case of returning item to queue: in 
                    // this case we received the message, but did not process 
                    // it because of some reason.
                    // QOS is 1, so our consumer is already full. We need 
                    // to return the message to the queue, so that another 
                    // consumer can work with it
                    Console.WriteLine($"[-] CANT PROCESS {j} consumer! 
                                        Error with -> {message}"); 
                    channel.BasicNack(deliveryTag: ea.DeliveryTag, 
                                            multiple: false, true);                
                }
                else                
                {
                    Console.WriteLine($" [x] Done {j} -> {message} at 
                                        {DateTime.Now}");
                    
                    // here channel could also be accessed as 
                    ((EventingBasicConsumer)sender).Model
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, 
                                                multiple: false);                
                }            
            };
            channel.BasicConsume(queue: queueName, autoAck: false, 
                                consumer: consumer);        
        }
        
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();    
    }
}

full example in link

ipodtouch0218
  • 674
  • 1
  • 2
  • 13
Shahram
  • 57
  • 2
  • 7