6

I'm trying to use the Delayed Message Queue for RabbitMQ from PHP, but my messages are simply disappearing.

I'm declaring the exchange with the following code:

$this->channel->exchange_declare(
    'delay',
    'x-delayed-message',
    false,  /* passive, create if exchange doesn't exist */
    true,   /* durable, persist through server reboots */
    false,  /* autodelete */
    false,  /* internal */
    false,  /* nowait */
    ['x-delayed-type' => ['S', 'direct']]);

I'm binding the queue with this code:

$this->channel->queue_declare(
    $queueName,
    false,  /* Passive */
    true,   /* Durable */
    false,  /* Exclusive */
    false   /* Auto Delete */
);
$this->channel->queue_bind($queueName, "delay", $queueName);

And I'm publishing a message with this code:

$msg = new AMQPMessage(json_encode($msgData), [
    'delivery_mode' => 2,
    'x-delay' => 5000]);
$this->channel->basic_publish($msg, 'delay', $queueName);

But the message doesn't get delayed; it's still immediately delivered. What am I missing?

Jesse Weigert
  • 4,714
  • 5
  • 28
  • 37
  • 3
    See the answer here on how to set the delay header: https://groups.google.com/d/msg/rabbitmq-users/vJEG7tdzi4E/lLXF4mhoAAAJ – old_sound Sep 28 '15 at 12:34

3 Answers3

6

From here,

The message creation should be

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$msg = new AMQPMessage($data,
            array(
                'delivery_mode' => 2, # make message persistent
                'application_headers' => new AMQPTable([
                    'x-delay' => 5000
                ])
            )
        );
Toosick
  • 460
  • 1
  • 6
  • 14
4

The answer is for those who need message delaying but does not want to dig into details. You need only a few things to get it working:

Install amqp interop compatible transport for example enqueue/amqp-bunny and enqueue/amqp-tools.

composer require enqueue/amqp-bunny enqueue/amqp-tools

Create amqp context, add a delay strategy and send delayed messages:

<?php
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpBunny\AmqpConnectionFactory;

$context = (new AmqpConnectionFactory('amqp://'))->createContext();
$context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy())

$queue = $context->createQueue('foo');
$context->declareQueue($queue);

$message = $context->createMessage('Hello world!');

$context->createProducer()
    ->setDeliveryDelay(5000) // 5 sec
    ->send($queue, $message)
;

By the way, this not the only strategy available. there is one based on RabbitMQ dead letter queues + ttl. It could be used the same way.

Maksim Kotlyar
  • 3,821
  • 27
  • 31
1

you need a routing key to publish from the exchange to the queue in question.

the reason publishing to the built-in direct exchange works, is because this exchange is a special case that uses the routing key as the destination queue name.

for all exchanges and queues that you create, you need to creating a binding between the exchange and the queue, with a routing key. then you publish the message with that routing key instead of the destination queue name.

i don't know the PHP code to create the binding... but it generally looks something like this:

channel.bind(exhange_name, queue_name, routing_key)

then in your publishing of the message:

$this->channel->basic_publish($msg, 'delay', $routing_key);

Derick Bailey
  • 72,004
  • 22
  • 206
  • 219
  • Thanks. I think I'm getting closer to making this work. – Jesse Weigert Aug 31 '15 at 20:06
  • Thank you. The message is getting delivered through the plugin, but it's not accepting my delay parameter. I've updated my question. – Jesse Weigert Aug 31 '15 at 20:25
  • i think you may need to set the "x-delay" in "properties->headers" not in "properties" directly. try that and see if it works – Derick Bailey Sep 01 '15 at 02:34
  • It appears the AMQP library is filtering out that header. :-( – Jesse Weigert Sep 01 '15 at 18:45
  • I'm going to give you the correct answer for this. Basically, the AMQP library is broken and I can't pass in custom headers to RabbitMQ. I ended up just storing the jobs in a database and have a task which dispatches them when necessary. – Jesse Weigert Sep 03 '15 at 00:16
  • 1
    that's a bummer :( maybe open a ticket w/ the library in their issues list? – Derick Bailey Sep 03 '15 at 02:54
  • 2
    This is not an issue with the library. See the answer here on how to set the delay header: https://groups.google.com/d/msg/rabbitmq-users/vJEG7tdzi4E/lLXF4mhoAAAJ – old_sound Sep 28 '15 at 12:34