1

We could make RabbitMQ a distributed priority queue by installing the plugin rabbitmq-priority-queue from https://www.rabbitmq.com/community-plugins.html. I push elements into the queue (each element is pushed with a priority) and I am able to receive the contents of queue in a consumer as desired - higher priority element comes out first.

The issue is that the priority polling concept is not working when this happens continuously:

  1. Run a publisher to populate 3 items with different priorities in a queue.
  2. Consume the messages in the queue - works good - consumes as per priority. now the consumer waits for any message in the queue, as of now queue is empty.
  3. I run the publisher again to populate some 5 elements.
  4. The consumer does not consume the 5 items from the queue in priority, instead it consumes in the order step 3 publisher published it.

What I need is on every poll of the queue item with maximum priority among the entire contents of queue should come out first.

Can anyone tell me what s the bug happening here? Thanks.

Here is the snippet of publisher and consumer (Java):

Publisher

public class RabbitMQPublisher {
    private static final String QUEUE = "my-priority-queue-3";
    public static void main(String[] argv) throws Exception {
        final ConnectionFactory factory = new ConnectionFactory();
        final Connection conn = factory.newConnection();
        final Channel ch = conn.createChannel();
        final Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-max-priority", 100);
        ch.queueDeclare(QUEUE, true, false, false, args);
        publish(ch, 24);
        publish(ch, 11);
        publish(ch, 75);
        //second run
        //publish(ch, 27);
        //publish(ch, 77);
        //publish(ch, 12);
        conn.close();
    }

    private static void publish(Channel ch, int priority) throws IOException {
        final BasicProperties props = MessageProperties.PERSISTENT_BASIC.builder().priority(priority).build();
        final String body = "message with priority " + priority;
        ch.basicPublish("", QUEUE, props, body.getBytes());
    }

Consumer

while (true) {
        final QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        final String message = new String(delivery.getBody());
        System.out.println(message);
    }

Output:

message with priority 75
message with priority 24
message with priority 11
message with priority 27
message with priority 77
message with priority 12
Dmytro Plekhotkin
  • 1,965
  • 2
  • 23
  • 47
Ranjith
  • 475
  • 6
  • 17
  • Did you bother setting the `basic.qos` at all? Otherwise maybe sleep for a little bit in that while loop to see what happens. – Adam Gent Sep 11 '14 at 20:45
  • Thanks for your reply @Adam Gent. I got the above working by using a basicGet instead of consumer.nextDelivery. final String message = new String(channel.basicGet(QUEUE_NAME, true).getBody()); This retrieves items according to priority from the queue. – Ranjith Sep 12 '14 at 06:58

1 Answers1

0

I was able to solve this using a basicGet to poll the queue instead of consumer.nextDelivery(). final String message = new String(channel.basicGet(QUEUE_NAME, true).getBody()); This pulls the item with highest priority from the queue.

Ranjith
  • 475
  • 6
  • 17