9

We have a standalone java application doing some background processing on a Debian machine. The jobs it has to handle are send through RabbitMQ messages.

When the java application needs to be upgraded we need to stop it (kill it). But we have to be sure that no consumers are currently processing a message. What is, in your experience, the best way to achieve this?

We tried to send a 'SHUTDOWN' message to a consumer, but we can't seem to close the queue or channel?! It freezes the application! Or is there another solution where we could for example auto shutdown the application, without doing the kill command in linux?

Thx to share you experience.

Greetings

  • 1
    You can use durable messages and in that case you can kill it when ever you want because messages coming back or going out for that matter will not be lost . That loose coupling / asynchronous com is one of the many reasons people use MQs . – Shahzeb Jul 15 '11 at 00:25
  • 1
    @Shahzeb Yes we use durable messages and for some of the jobs we can simply kill it and upon restart it reprocesses the messages. But 1 particular job is doing some business with an external api and it must not be interrupted because once in the process it is not possible to rerun it on the same request. – Toni Van de Voorde Jul 15 '11 at 03:55

2 Answers2

4

The RabbitMQ Java libraries do not provide (AFAIK) anything which would automatically postpone the shutdown of a consumer process while some messages are still being processed. Therefore you'll have to do this yourself.

If your application can tolerate it, just shut down. Any message which had not been acknowledged at that point will remain on the queue within the broker and will be re-delivered when the consumer comes back up.

If you can't tolerate that and you absolutely must ensure that all in-progress message processing finishes, then you need to follow advice similar to what's found in this answer and do the following in your shutdown handler:

  1. Set an "all threads should exit" flag to true
  2. Join with each of the threads in your thread pool
  3. Exit gracefully

That implies that each of your message processing threads (assuming you have multiple threads processing messages concurrently) need to follow this general pattern:

  1. Pull a message from the queue and process it
  2. Ack the message which was just processed
  3. If the "all threads should exit" flag is true, exit the thread function
  4. Rinse, repeat

Hope that helps.

Community
  • 1
  • 1
Brian Kelly
  • 19,067
  • 4
  • 53
  • 55
  • Thx for your insights. I do understand the theory but the way we use RabbitMQ we don't specifically create multiple threads. Its all done through creation of multiple channels. So I don't really see/understand how I could implement this. But I'll continue looking based on your proposal. Thx again. – Toni Van de Voorde Jul 15 '11 at 07:39
  • What if you close all your channels within your shutdown handler (in addition to waiting for all the in-progress messages to be processed as described above)? Wouldn't that allow you to shutdown gracefully? – Brian Kelly Jul 15 '11 at 18:52
  • Hi Brian. I did find a working solution. Your post was very helpful for this but I couldn't find the time yet to post the solution. I hope I will be able to do it this week! – Toni Van de Voorde Jul 19 '11 at 07:24
0

Here's my take on it.

I created my subclass of DefaultConsumer (BasicConsumer) which provides isCancelled() and implements handleCancelOk() which sets a "cancelled flag" to true.

Start sequence:

consumers = new ArrayList<BasicConsumer>();
consumers.add(...)

Stop sequence:

// Cancel all consumers
for (BasicConsumer consumer : consumers) {
  try {
    consumer.getChannel().basicCancel(consumer.getConsumerTag());
  } catch (Exception e) {
    // report
  }
}

// Wait for all consumers to be cancelled
Timeout timeout = ...;
while (!consumers.isEmpty() && !timeout.isElapsed()) {
  // Remove cancelled consumers
  for (Iterator<BasicConsumer> iterator = consumers.iterator(); iterator.hasNext();) {
    if (iterator.next().isCancelled())
      iterator.remove();
  }
}

// Here we could force-close the remaining timed-out consumers if we
// used our own ExecutorService by shutting down all of its threads.
connection.close();

Relevant RabbitMQ ML thread: How to shutdown cleanly a Java application using Consumers?

bernie
  • 9,820
  • 5
  • 62
  • 92