I am working on implementing Akka Alpakka for consuming from and producing to ActiveMQ queues, in Java. I can consume from the queue successfully, but I haven't yet been able to implement application-level message acknowledgement.
My goal is to consume messages from a queue and send them to another actor for processing. When that actor has completed processing, I want it to be able control the acknowledgement of the message in ActiveMQ. Presumably this would be done by sending a message to another actor that can do the acknowledgement, calling an acknowledge function on the message itself, or some other way.
In my test, 2 messages are put into the AlpakkaTest queue, and then this code attempts to consume and acknowledge them. However, I don't see a way to set the ActiveMQ session to CLIENT_ACKNOWLEDGE, and I don't see any difference in behavior with or without the call to m.acknowledge();
. Because of this, I think messages are still being auto-acknowledged.
Does anyone know the accepted way to configure ActiveMQ sessions for CLIENT_ACKNOWLEDGE and manually acknowledge ActiveMQ messages in Java Akka systems using Alpakka?
The relevant test function is:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2999"); // An embedded broker running in the test.
Source<Message, NotUsed> jmsSource = JmsSource.create(
JmsSourceSettings.create(connectionFactory)
.withQueue("AlpakkaTest")
.withBufferSize(2)
);
Materializer materializer = ActorMaterializer.create(system); // `system` is an ActorSystem passed to the function.
try {
List<Message> messages = jmsSource
.take(2)
.runWith(Sink.seq(), materializer)
.toCompletableFuture().get(4, TimeUnit.SECONDS);
for(Message m:messages) {
System.out.println("Found Message ID: " + m.getJMSMessageID());
try {
m.acknowledge();
} catch(JMSException jmsException) {
System.out.println("Acknowledgement Failed for Message ID: " + m.getJMSMessageID() + " (" + jmsException.getLocalizedMessage() + ")");
}
}
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e1) {
e1.printStackTrace();
} catch (TimeoutException e1) {
e1.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
This code prints:
Found Message ID: ID:jmstest-43178-1503343061195-1:26:1:1:1
Found Message ID: ID:jmstest-43178-1503343061195-1:27:1:1:1