2

I have a Producer sending messages to a topic exchange. Each message contains a routing key. (Apologies for the primitive diagram)

       P
       |
       X
     /| |\
    / | | \
   /  | |  \
   Q1 Q2 Q3 Q4
   |   / /  /
   |  / /  /
   | / /  /
   |/ /  /
      C

I'm using php-amqplib and am attempting to consume a number of queues. What I am trying to achieve is to test each queue sequentially, see if it has a message and, if so, process it, otherwise, move on to the next queue. Also, if a message has been found, start the checking process again from Q1. The following code doesn't work but will demonstrate the logic of what I want to do.

$connection = new AMQPConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

$channel->exchange_declare('myexchange', 'topic', false, false, false);

$channel->queue_declare("Q1", false, true, false, false);
$channel->queue_bind("Q1", 'myexchange', 'priority.1');

$channel->queue_declare("Q2", false, true, false, false);
$channel->queue_bind("Q2", 'myexchange', 'priority.2');

$channel->queue_declare("Q3", false, true, false, false);
$channel->queue_bind("Q3", 'myexchange', 'priority.3');

$channel->queue_declare("DFQ4", false, true, false, false);
$channel->queue_bind("DFQ4", 'myexchange', 'priority.4');

$queues = array('Q1','Q2','Q3','Q4');

$priority = 0;
while (1) {

    $priority = ($priority<4)? $priority+1 : 0;

    $msg = $channel->basic_consume($queues[$priority], $consumer_tag, false, false, false, false);
    if(isset($msg->body)) {
        echo ' [x] ',$msg->delivery_info['routing_key'], "\n";
        $channel->basic_ack($msg->delivery_info['delivery_tag']);
        $priority = 0;
    }
}

$channel->close();
$connection->close();
agnitio
  • 137
  • 1
  • 10
  • I don't understand why you would want to check the queues again instead of processing a message when you receive it. And I don't understand why you would need priority when you can just declare any number of consumers you want and process everything in parallel. – Kethryweryn Sep 16 '13 at 16:59
  • A message in Q1 should be processed before one in Q2, before one in Q3 and so on. Say, at one point in time, there were messages only in Q4. One would be processed, then a check made to see if any messages had appeared on Q1, Q2 & Q3 before attempting to process the next message on Q4. Jobs on Q1 need to be done first, hence the need for priority. Doug Barth described a possible solution [http://dougbarth.github.io/2011/07/01/approximating-priority-with-rabbitmq.html] – agnitio Sep 17 '13 at 08:36
  • Have you tried your code with [basic_get](https://github.com/videlalvaro/php-amqplib/blob/master/demo/basic_get.php) instead of basic_consume ? – Kethryweryn Sep 17 '13 at 09:02

1 Answers1

0

I had the same requirement for my .NET projects. Basically, I had the similar approach but I used header exchange to feed published messages to those queues. The library I created subscribe to all those queues (with same prefetch size value) and put all those messages to an internal memory queue where those messages can be sorted by their priorities. So at the end of the pipeline, the actual code to process messages will read them from the internal queue, messages with higher priority will be processed and then acked before the lower priority one.

Van Thoai Nguyen
  • 986
  • 9
  • 22