I have a Producer sending messages to a topic exchange. Each message contains a routing key. (Apologies for the primitive diagram)
P
|
X
/| |\
/ | | \
/ | | \
Q1 Q2 Q3 Q4
| / / /
| / / /
| / / /
|/ / /
C
I'm using php-amqplib and am attempting to consume a number of queues. What I am trying to achieve is to test each queue sequentially, see if it has a message and, if so, process it, otherwise, move on to the next queue. Also, if a message has been found, start the checking process again from Q1. The following code doesn't work but will demonstrate the logic of what I want to do.
$connection = new AMQPConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
$channel->exchange_declare('myexchange', 'topic', false, false, false);
$channel->queue_declare("Q1", false, true, false, false);
$channel->queue_bind("Q1", 'myexchange', 'priority.1');
$channel->queue_declare("Q2", false, true, false, false);
$channel->queue_bind("Q2", 'myexchange', 'priority.2');
$channel->queue_declare("Q3", false, true, false, false);
$channel->queue_bind("Q3", 'myexchange', 'priority.3');
$channel->queue_declare("DFQ4", false, true, false, false);
$channel->queue_bind("DFQ4", 'myexchange', 'priority.4');
$queues = array('Q1','Q2','Q3','Q4');
$priority = 0;
while (1) {
$priority = ($priority<4)? $priority+1 : 0;
$msg = $channel->basic_consume($queues[$priority], $consumer_tag, false, false, false, false);
if(isset($msg->body)) {
echo ' [x] ',$msg->delivery_info['routing_key'], "\n";
$channel->basic_ack($msg->delivery_info['delivery_tag']);
$priority = 0;
}
}
$channel->close();
$connection->close();