4

I want to log working status from workers' callbacks and include a number of messages in the queue left.

The only solution I found so far is getting the second member of queue_declare result array, but this should be called once per worker launch, and I need info to be updated every new message.

UPD: Solution based on IMSoP's answer:

<?php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('test1');
echo "[*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) use ($channel) {
    list (, $cn) = $channel->queue_declare('test1', true);
    echo ' [x] Received ', $msg->body, " $cn left";
    for ($i = 0; $i < $msg->body; ++$i) {
        sleep(1);
        echo '.';
    }
    echo "\n";
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('test1', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
    $channel->wait();
}

For some reason always gives 0 as message count.

13DaGGeR
  • 147
  • 1
  • 10
  • It might be better to record this separately then, you could increment/decrement a counter in memcached or redis. – fire Apr 15 '19 at 08:57
  • 1
    I just noticed that your title says "PhpAmqp" but your tags include "php-amqplib". There are two different AMQP libraries for PHP, one is [an extension](https://pecl.php.net/package/amqp), the other [a PHP implementation](https://github.com/php-amqplib/php-amqplib). The features are pretty much the same, but the classes and functions are named differently, so you might want to clarify which you are using. – IMSoP Apr 15 '19 at 14:17
  • I think I found the reason. You will have to set the noAck to false in `basic_consume` and manually do ack in message handling in order to get the correct messageCount. The library will ignore the qos prefetch_count setting if noAck is set to true. – cr001 Sep 22 '21 at 04:19

1 Answers1

5

The queue_declare method has a parameter called "passive" which can be used for this purpose: it checks if the queue exists, by name only, and ignores any other parameters.

According to the AMQP documentation:

If set, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. The client can use this to check whether a queue exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence.

Note that Declare-Ok is not just a status, but the name of the full response structure, with fields queue, message-count, and consumer-count.

In PHP-AMQPLib, you can use this to log the status of a set of queues something like this:

foreach ( $this->registeredQueues as $queueName ) {
    // The second parameter to queue_declare is $passive
    // When set to true, everything else is ignored, so need not be passed
    list($queueName, $messageCount, $consumerCount)
        = $this->rabbitChannel->queue_declare($queueName, true);

    $this->logger->info(
        "Queue $queueName has $messageCount messages and $consumerCount active consumers."
    );
}
IMSoP
  • 89,526
  • 13
  • 117
  • 169
  • 2
    For some reason always gives 0 as message count. I updated question with code example. – 13DaGGeR Apr 15 '19 at 15:14
  • I am having the exact same problem as the passive true result always give `$messageCount === 0`, despite `rabbitmqctl list_queues` giving positive number. It's hard to imagine something this important is not supported. – cr001 Sep 20 '21 at 12:42
  • I think I found the reason. You will have to set the noAck to false in `basic_consume` and manually do ack in message handling in order to get the correct messageCount. – cr001 Sep 22 '21 at 03:41