1

I am using Akka (latest stable version), akka-camel and JMS (for the purposes of this conversation, let's say it's ActiveMQ, but ideally the solution should be generic).

The use-case

I have the following use-case. On a queue Q I receive messages like this:

time:   1     2    3    4    5    6

      | A1 | B1 | C1 | C2 | A2 | B2 | .... 
         ^                        ^
       first                     latest

My end goal is to pair them together in (A1,A2), (B1,B2), and so on; notwithstanding complications like duplicates and undelivered messages, the complication is that I have to ensure that, until a whole pair is matched and processes, the broker will keep hold of all unacknowledged messages.

Example

At 4 I received and processed 4 messages, and processed successfully pair (C1, C2), however I still cannot acknowledge anything back to the broker because A1 and B1 are still unmatched and pending, and in JMS to acknowledge C2 means to acknowledge all messages up to C2. As a matter of fact, the first acknowledgement I can send back is at time 5, when A2 is received: at this point I can acknowledge A1 (and only A1, as B1 is still pending).

The problem

Now, what I can't seem to figure out is how to do this kind of delayed and async acknowledgement via akka-camel. I have been reading online and, while I can find explanations on how to acknowledge messages manually (docs and an example), there is nothing showing how to acknowledge a previously processed message to the broker.

import akka.camel.{ CamelMessage, Consumer }
import akka.camel.Ack
import akka.actor.Status.Failure

class Consumer3 extends Consumer {
  override def autoAck = false

  def endpointUri = "jms:queue:test"

  def receive = {
    case msg: CamelMessage =>
      sender() ! Ack
      // on success
      // ..
      val someException = new Exception("e1")
      // on failure
      sender() ! Failure(someException)
  }
}

In this case, Ack is an object and its semantic is really just: I acknowledge the current message while I would need something like Message X is now acknowledged, where X is some previous message, but not necessarily the current one.

Is this use-case supported or supportable via akka-camel or should I just build it myself?

Thanks

Community
  • 1
  • 1
mdm
  • 3,928
  • 3
  • 27
  • 43

1 Answers1

2

It sounds like what you want is activemq's ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE.

Sadly INVIDUAL_ACKNOWLEDGE is not part of the JMS spec, but according to this post that behvaiour it is widely adopted on a per queue basis. (I haven't actually checked)

Since it's not on spec, akka doesn't support it out of the box, but it sure would make a good contribution!

melps
  • 1,247
  • 8
  • 13