51

Is it possible to send message via RabbitMQ with some delay? For example I want to expire client session after 30 minutes, and I send a message which will be processed after 30 minutes.

alex
  • 511
  • 1
  • 4
  • 4

8 Answers8

31

There are two approaches you can try:

Old Approach: Set the TTL(time to live) header in each message/queue(policy) and then introduce a DLQ to handle it. once the ttl expired your messages will move from DLQ to main queue so that your listener can process it.

Latest Approach: Recently RabbitMQ came up with RabbitMQ Delayed Message Plugin , using which you can achieve the same and this plugin support available since RabbitMQ-3.5.8.

You can declare an exchange with the type x-delayed-message and then publish messages with the custom header x-delay expressing in milliseconds a delay time for the message. The message will be delivered to the respective queues after x-delay milliseconds

byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new 
AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

More here: git

Pang
  • 9,564
  • 146
  • 81
  • 122
lambodar
  • 3,495
  • 5
  • 34
  • 58
  • 7
    The Delayed Message Plugin has a limitation on total delayed message count in queue since it it using Mnesia to store those delayed messages (https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72) – henrylilei Aug 14 '19 at 13:26
  • The "delayed exchange plugin" is the most elegant solution but you have to be careful before to adopt it in a production-grade project. For the limitations mentioned by @henrylei it's possible to loose messages, since messages are stored without redundancy before to be actually published in the exchange. For heavy loads and critical features, the "fake" dead letter is still the most accountable alternative – Carmine Ingaldi Mar 07 '21 at 09:42
14

With the release of RabbitMQ v2.8, scheduled delivery is now available but as an indirect feature: http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

Jonathan Oliver
  • 5,207
  • 32
  • 31
  • I tried this approach but hit a few issues, suggestions any one? http://blog.james-carr.org/2012/03/30/rabbitmq-sending-a-message-to-be-consumed-later/#comment-502703 – Andreas Öhlund Jan 15 '13 at 13:46
  • 7
    I did a spike and hit a few showstoppers: 1. Messages are only DLQ:en when at the top of the Q (http://www.rabbitmq.com/ttl.html – Caveats section) This means that if I first set msg 1 to expire in 4 hours and msg2 to expire in 1 hours msg2 will only expire after msg1 has expired. 2. The TTL for the message is kept by Rabbit so lets say you use a short timeout of 10 s. If the consumer hasn’t been able to consume the message withing 10 seconds after it expired (due to a backlog) it will be discarded and lost The above has been verified with Rabbit 3.0.1 Do you guys see any workarounds? – Andreas Öhlund Jan 16 '13 at 14:51
12

Thanks to Norman's answer, I could implement it in Node.js.

Everything is pretty clear from the code.

var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});

// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
      deadLetterExchange: "my_final_delayed_exchange",
      messageTtl: 5000, // 5sec
}, function (err, q) {
      ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
      ch.bindQueue(q.queue, "my_final_delayed_exchange", '');

      ch.consume(q.queue, function (msg) {
          console.log("delayed - [x] %s", msg.content.toString());
      }, {noAck: true});
});
Pang
  • 9,564
  • 146
  • 81
  • 122
walv
  • 2,680
  • 3
  • 31
  • 36
11

As I don't have enough reputation to add comment, posting a new answer. This is just an addition to what has been already discussed at http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

Except instead of setting ttl on messages, you can set it at queue level. Also you can avoid creating a new exchange just for the sake of redirecting the messages to different Queue. Here is sample Java code:

Producer:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;

public class DelayedProducer {
    private final static String QUEUE_NAME = "ParkingQueue";
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-message-ttl", 10000);
        arguments.put("x-dead-letter-exchange", "");
        arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME );
        channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);

        for (int i=0; i<5; i++) {
            String message = "This is a sample message " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("message "+i+" got published to the queue!");
            Thread.sleep(3000);
        }

        channel.close();
        connection.close();
    }
}

Consumer:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {
   private final static String DESTINATION_QUEUE_NAME = "DestinationQueue";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        boolean autoAck = false;
        channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
Pang
  • 9,564
  • 146
  • 81
  • 122
Sateesh
  • 735
  • 8
  • 10
  • Thank You very much. I think you you have a small mistake in the consumer queue declare channel.queueDeclare(QUEUE_NAME, false, false, false, null); It should have the "DESTINATION_QUEUE_NAME" instead of "QUEUE_NAME". Really thank you very very much – Nour Lababidi Sep 19 '16 at 21:45
8

It looks like this blog post describes using the dead letter exchange and message ttl to do something similar.

The code below uses CoffeeScript and Node.js to access Rabbit and implement something similar.

amqp   = require 'amqp'
events = require 'events'
em     = new events.EventEmitter()
conn   = amqp.createConnection()
  
key = "send.later.#{new Date().getTime()}"
conn.on 'ready', ->
  conn.queue key, {
    arguments:{
      "x-dead-letter-exchange":"immediate"
    , "x-message-ttl": 5000
    , "x-expires": 6000
    }
  }, ->
    conn.publish key, {v:1}, {contentType:'application/json'}
  
  conn.exchange 'immediate'
 
  conn.queue 'right.now.queue', {
      autoDelete: false
    , durable: true
  }, (q) ->
    q.bind('immediate', 'right.now.queue')
    q.subscribe (msg, headers, deliveryInfo) ->
      console.log msg
      console.log headers
Pang
  • 9,564
  • 146
  • 81
  • 122
Norman H
  • 2,248
  • 24
  • 27
6

That's currently not possible. You have to store your expiration timestamps in a database or something similiar, and then have a helper program that reads those timestamps and queues a message.

Delayed messages are an often requested feature, as they're useful in many situations. However, if your need is to expire client sessions I believe that messaging is not the ideal solution for you, and that another approach might work better.

Alessandro
  • 1,336
  • 8
  • 15
1

Suppose you had control over the consumer, you could achieve the delaying on the consumer like this??:

If we are sure that the nth message in the queue always has a smaller delay than the n+1th message (this can true for many use cases): The producer sends timeInformation in the task conveying the time at which this job needs to be executed (currentTime + delay). The consumer:

1) Reads the scheduledTime from the task

2) if currentTime > scheduledTime go ahead.

Else delay = scheduledTime - currentTime

sleep for time indicated by delay

The consumer is always configured with a concurrency parameter. So, the other messages will just wait in the queue until a consumer finishes the job. So, this solution could just work well though it looks awkward especially for big time delays.

Swami
  • 31
  • 4
0

AMQP protocol does not support delayed messaging, but by using Time-To-Live and Expiration and Dead Letter Exchanges extensions delayed messaging is possible. The solution is described in this link. I copied the following steps from that link:

Step by step:

Declare the delayed queue
    Add the x-dead-letter-exchange argument property, and set it to the default exchange "".
    Add the x-dead-letter-routing-key argument property, and set it to the name of the destination queue.
    Add the x-message-ttl argument property, and set it to the number of milliseconds you want to delay the message.
Subscribe to the destination queue

There is also a plugin for delayed messaging in RabbitMQ repository on GitHub.

Note that there is a solution called Celery which supports delayed task queuing on RabbitMQ broker by presenting a calling API called apply_async(). Celery supports Python, node and PHP.

Pouya Esmaeili
  • 1,265
  • 4
  • 11
  • 25