9

How do I implement mechanism which reject message after few configurable requeue attempts?

In other words, if I'm subscribing to a queue I want to guaranty that same message does not redelivered more then X times.

My code sample:

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) {
  try{
    doSomething(data);
  } catch(e) {
   message.reject(true);
  }
}
shaiis.com
  • 244
  • 4
  • 10
  • Possible duplicate http://stackoverflow.com/q/17654475 – pinepain Feb 13 '14 at 21:10
  • How are you identifying messages uniquely? Do you have your own ID in the message payload? – Brian Kelly Feb 18 '14 at 02:58
  • 1
    In my case (but I'm not the OP) - yes, messages can be identified by their UUID. Alas, it's not enough to have a simple counter in the subscriber, as I have multiple subscribers to the same queue to balance work, and number of retries should be global, not local to each worker. – Golo Roden Feb 18 '14 at 08:02

2 Answers2

1

In my opinion the best solution is to handle these errors in your application and reject them when app has decided that it can't process the message.

If you don't want to lose information, the app should reject the message only after it sent the same message to an error queue.

code is not tested:

q.subscribe({ack: true}, function () {
  var numOfRetries = 0;
  var args = arguments;
  var self = this;
  var promise = doWork.apply(self, args);
  for (var numOfRetries = 0; numOfRetries < MAX_RETRIES; numOfRetries++) {
    promise = promise.fail(function () { return doWork.apply(self, args); });
  }

  promise.fail(function () {
    sendMessageToErrorQueue.apply(self, args);
    rejectMessage.apply(self, args);
  })
})
Noam
  • 1,018
  • 7
  • 18
0

One possible solution is to hash the message using some sort of hash function you define, then check a cache object for that hash. If it is there, add one to the cache up to the configurable max, and if it's not there, set it to 1. Here's a quick and dirty prototype for you (note that the mcache object should be in scope for all subscribers):

var mcache = {}, maxRetries = 3;

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) {
  var messagehash = hash(message);
  if(mcache[messagehash] === undefined){
    mcache[messagehash] = 0;
  }
  if(mcache[messagehash] > maxRetries) {
    q.shift(true,false); //reject true, requeue false (discard message)
    delete mcache[messagehash]; //don't leak memory
  } else {
    try{
      doSomething(data);
      q.shift(false); //reject false
      delete mcache[messagehash]; //don't leak memory
    } catch(e) {
      mcache[messagehash]++;
      q.shift(true,true); //reject true, requeue true
    }
  }
}

if the message has a GUID, you can simply return that in the hash function.

gcochard
  • 11,408
  • 1
  • 26
  • 41
  • This introduces the cache as single point of failure. I'd like to have a solution completely using RabbitMQ techniques. – Golo Roden Feb 18 '14 at 19:52
  • In what scenario do you envision the cache failing? – gcochard Feb 18 '14 at 22:49
  • The cache (physically) needs to run on any server. What if this server goes down? So you need the cache to be highly available and fail-safe, and this increases the required effort. If there was any possibility to use RabbitMQ itself for it, I'd highly favor such a solution. – Golo Roden Feb 19 '14 at 08:55
  • Definitely something that causes concern, but if your receiver goes down, do you really want the messages to be dropped completely? Setting `ack` to true implies the messages need to be received. – gcochard Feb 20 '14 at 16:36