0

I have the use case that I need to wait 2 hours before consuming messages from an AMQP (we use Rabbit) queue.

EDIT: To clarify my use case... I need each individual message to wait 2 hours before being read. E.g. Message 1 arrives at 10am and Message 2 arrives at 10:15. I need Message 1 to be read at 12p and Message 2 to be read at 12:15p.

We are using Spring Integration 3.x.

The int-amqp:inbound-channel-adapter is message driven and doesn't have a polling option from what I can find.

A couple things I've thought of:

Any suggestions?

Community
  • 1
  • 1
Ryan Walls
  • 6,962
  • 1
  • 39
  • 42

3 Answers3

1

We don't currently have a polling inbound adapter. #1 is easy. For #2, the simplest would be to use a RabbitTemplate and invoke receive() from an inbound-channel-adapter in a POJO.

I would go with #1; you don't need quartz, you can use a simple Spring scheduled task and a control bus to start the adapter.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks for the info. After rethinking over the weekend, I'm not sure either solution would solve my problem though. I didn't state my use case very well. I need to have each individual message wait 2 hours. I'm thinking options 3 or newly added option 4 in my question may be the best solution. – Ryan Walls Jun 09 '14 at 17:05
  • 1
    For that use case (every message delayed 2 hours) then option #3 is the way to go; I would not introduce a delayer. – Gary Russell Jun 09 '14 at 18:04
1

Another trick is about to use PollableAmqpChannel:

<int-amqp:channel id="myQueueName" message-driven="false"/>

and provide the <poller> for the subscriber to that channel.

There is no reason to send messages to that channel (because you will poll messages from Rabbit Queue) and, right, it looks like anti-pattern, but it is a hook how to avoid any workarounds with direct RabbitTemplate usage via SpEL.

UPDATE

<delayer> can help you, but it depends of your requirements. If you don't want to poll messages from RabbitMQ, you should use the workaround above. But if you just don't want to process message until some time is elapsed, you can just 'delay' it for that time.

Don't forget to add persistent message-store to avoid losing messages during that period and unexpected application failure.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Thanks. That's a good workaround. The one problem now...I'm not sure that a pollable channel would work for me because if I poll every 2 hours for only 1 message, the queue could get really backed up. Updated my question to clarify my use case. – Ryan Walls Jun 09 '14 at 16:59
  • Added comments on the matter – Artem Bilan Jun 09 '14 at 17:20
0

FYI, how I solved the issue. (Used solution #3).

<rabbit:queue name="delayQueue" durable="true">
  <rabbit:queue-arguments>
    <entry key="x-message-ttl">
       <value type="java.lang.Long">7200000</
     </entry>
     <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>
     <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/>
  </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange name="finalDestinationTopic">
  <rabbit:bindings>
    <rabbit:binding queue="finalDestinationQueue" pattern="finalDestinationQueue"/>
  </rabbit:bindings>
</rabbit:topic-exchange>
Ryan Walls
  • 6,962
  • 1
  • 39
  • 42