2

I'm using https://github.com/videlalvaro/php-amqplib to do some rabbitmq work:

I'm trying to create a blocking version of basic_get, (or a version of basic_consume that I can call repeatedly and only get one msg each time) that will block until a message is ready and then return it instead of returning null if none is on the queue.

When I try to grab a single message with basic_consume things get gummed up and I end up with a bunch of "not ready" but unacked messages. (If I only grab one msg this way, it works every time, if I try to get 2 messages, it gets hung up sometimes and works others)

class Foo {
    ...
    private function blockingGet() {
            /*
                queue: Queue from where to get the messages
                consumer_tag: Consumer identifier
                no_local: Don't receive messages published by this consumer.
                no_ack: Tells the server if the consumer will acknowledge the messages.
                exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
                nowait:
                callback: A PHP Callback
             */
            $this->ch->basic_consume($this->queueName, "consumer_".$this->consumerNum++, false, false, false, false, function($msg) {
                    $this->msgCache = json_decode($msg->body);
                    $this->ch->basic_ack($msg->delivery_info['delivery_tag']);
                    $this->ch->basic_cancel($msg->delivery_info['consumer_tag']);
            });
            while (count($this->ch->callbacks)) {
                    $this->ch->wait();
            }
            return $this->msgCache;
    }
}

$q = new Foo();
for ($i = 0; $i < 5; $i++) {
    print $q->blockingGet();
}
ben schwartz
  • 2,559
  • 1
  • 21
  • 20
  • I think what you may need here is the `basic_qos` call, which sets how many messages are prefetched by the consumer. I'd have to test, but I suspect a prefetch limit of 1 would fix the side-effects you describe. – IMSoP Feb 16 '16 at 19:08

1 Answers1

0

I've implemented something similar to what you are after here by saving the received message within a closure passed to the callback parameter of the $channel->basic_consume(), and then dealing with it after the $channel->wait() call, as wait() will return control if a message is received (or if the timeout parameter is set and the timeout is reached). Try something like the below:

class Foo {
    // ...
    public function __construct() {
        $this->ch->basic_consume($this->queueName, "", false, false, false, false, function($msg) {
                $this->msgCache = json_decode($msg->body);
                $this->ch->basic_ack($msg->delivery_info['delivery_tag']);
        });
    }
    // ...
    private function blockingGet() {
        $this->ch->wait();
        if ($this->msgCache) {
            $msgCache = $this->msgCache;
            $this->msgCache = null;
            return $msgCache;
        }
        return null;
    }
}

$q = new Foo();
for ($i = 0; $i < 5; $i++) {
    print $q->blockingGet();
}
Benjamin
  • 1,221
  • 11
  • 28