2

I am using Camel JMS component for request-reply for communication with MQ. For some of my requests I can receive n messages in reply. How can I aggregate these reply messages?

I thought of using aggregator pattern with aggregation strategy, but can't use it as I am not sure on number of messages which can come in reply.

Can community help me understand what's the right way to do it? I did some google search but couldn't find something useful. Below is my sample route code

from("direct:"+routeName).routeId(routeName)
                        .setHeader("JMSCorrelationID", constant(UUID.randomUUID().toString()))
                        .circuitBreaker()
                            .resilience4jConfiguration()
                            .minimumNumberOfCalls(3)
                        .end()
                        .to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&replyTo=CAMELDEMOREPLY")
                            .log("${body}")
                            .unmarshal(customerDetailsOutBound)
                            .process(new Processor() {
                                    @Override
                                    public void process(Exchange exchange) throws Exception {
                                        System.out.println(exchange.getIn().getBody().toString());
                                    }
                            })
                        .onFallback().process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                System.out.println("Store this message to backup");
                            }
                        })
                        .end();

Looking forward to get some good insights from community. Thank you.

gomzee
  • 103
  • 1
  • 9
  • request-reply is working fine here. Only concern is to how to make a thread wait for other replies or how to aggregate other replies based on same correlationId – gomzee Jun 22 '20 at 07:34

3 Answers3

1

Message flow

  1. your first route sends a message to CAMELDEMO queue and start waiting for a single aggreagted message on a new queue CAMELDEMO_AGGREGATED_REPLY
  2. component that received the message on CAMELDEMO, start sending responses to CAMELDEMOREPLY queue and also indicates how many responses will be sent
  3. Second route below starts listening on CAMELDEMOREPLY, aggregates the message and send the aggregated message to CAMELDEMO_AGGREGATED_REPLY.
  4. Your first route that was waiting for the reply on CAMELDEMO_AGGREGATED_REPLY gets the aggregated reply, receives single message and sends it back

Original route updated to await for reply on CAMELDEMO_AGGREGATED_REPLY

...
.to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&
                replyTo=CAMELDEMO_AGGREGATED_REPLY")
.log("${body}")
.unmarshal(customerDetailsOutBound)
.process(new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            System.out.println(exchange.getIn().getBody().toString());
        }
})
....

Second route to aggregate the messages

from(mqComponentBeanName+"://CAMELDEMOREPLY?
                          exchangePattern=In&requestTimeout=10000)
.aggregate(header("JMSCorrelationID"), new MyAggregationStrategy())
.to(mqComponentBeanName+"://CAMELDEMO_AGGREGATED_REPLY?
                          exchangePattern=Out&requestTimeout=10000)
public final class MyCompletionStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExch, Exchange newExchange) 
    {
        ...
        //Here you check your flag regarding the number of responses
        // you were supposed to receive, and if it is met
        // complete the aggregation by setting it to true
        oldExch.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
                ...
         return oldExchange;
     }
}
  • Kavitha, can't do this way as request-reply works on single thread. And once reply is received it moves forward. – gomzee Jun 22 '20 at 07:26
  • Yes, client cannot receive multiple streaming response. For client its HTTP request. And downstream system receives requests via IBM MQ and sends response in reply queue. For some of the requests downstream system can send multiple response for same correlationID. So, responsibility of this route is aggregate all those requests while returning response to client. Also, reply/response message from MQ has a field to identify how many message to expect as response. Which can also be used to **loop** but problem is how to consume those remaining messages, as req-repl works on single thread. – gomzee Jun 22 '20 at 07:52
  • In your point 3, how you can get the JMSCorrelationId for which have to aggregate the messages? Which is available in point 1. – gomzee Jun 22 '20 at 10:00
  • Yes replier need to set agree. But Second route, is not aware on that particular CorrelationId for which it has to aggregate msgs. – gomzee Jun 22 '20 at 10:08
  • Yes. For instance in my first route I sent msg to MQ with JMSCorrelationId=1001, and I should expect 10 reply message for this. Now, second route will not know that it has to aggregate for **JMSCorrelationId=1001** . – gomzee Jun 22 '20 at 10:18
  • It will receive every message, group them with distinct group Id, so first it receives ```JMSCorrelationId=1001```, it puts in a new bucket, then the second message was ```JMSCorrelationId=1003```, it checks it is totally new, so it creates a another bucket, third message comes with ``````JMSCorrelationId=1001``` so it puts it in the first bucket. And when a particular bucket is full for a correlation id, it sends it as response. Read about the aggregator pattern if you are not sure – Kavithakaran Kanapathippillai Jun 22 '20 at 10:24
  • 1
    now got your point. You were trying to say aggregate every message in second route based on correlationId. Yes, this can be considerable solution. But, has an overhead cost of creation of new queue and read and write operation. As its IBM MQ, it has its own costs. So, this solution can be assessed based on cost. – gomzee Jun 22 '20 at 10:38
1

I was able to solve this with single route. Solution may not be that neat, but works and fulfils the purpose. I have used loopDoWhile and in the processor inside loopDoWhile I am fetching message from queue using plain java code.

from("direct:"+routeName).routeId(routeName)
                    .setHeader("JMSCorrelationID", constant(UUID.randomUUID().toString()))
                    .circuitBreaker()
                        .resilience4jConfiguration()
                        .minimumNumberOfCalls(3)
                    .end()
                    .to(mqComponentBeanName+"://CAMELDEMO?exchangePattern=InOut&requestTimeout=10000&replyTo=CAMELDEMOREPLY")
                        .log("${body}")
                        .unmarshal(customerDetailsOutBound)
                        .process(new Processor() {
                                @Override
                                public void process(Exchange exchange) throws Exception {
                                    System.out.println(exchange.getIn().getBody().toString());


int msgCount = getMsgCountfromFirstReposnse;
if (msgCount > 1) {
exchange.getIn().setHeader("COUNTER", 0);
exchange.getIn().setHeader("MSG_COUNT", msgCount-1);
exchange.setProperty("connectionFactory", connectionFactory);
}
                                }
                        })
                    .loopDoWhile(simple("${headers.COUNTER} != ${headers.MSG_COUNT}"))
                            .process(simpleJMSConsumerProcess)
                        .end().endCircuitBreaker()
                    .onFallback().process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception {
                            System.out.println("Store this message to backup");
                        }
                    })

Code inside processor:

ConnectionFactory connectionFactory = (ConnectionFactory) exchange.getProperty("connectionFactory");
    Connection connection = connectionFactory.createConnection();
    Session session = connection.createSession(false,
            Session.AUTO_ACKNOWLEDGE);

    try {
        Queue queue = session.createQueue("CAMELDEMOREPLY?consumer.priority=10");
        MessageConsumer consumer = session.createConsumer(queue, "JMSCorrelationID = '"+exchange.getIn().getHeader("JMSCorrelationID").toString()+"'");
        connection.start();
        TextMessage textMsg = (TextMessage) consumer.receive();
        System.out.println(textMsg);
        System.out.println("Received: " + textMsg.getText());
        exchange.getIn().setHeader("COUNTER", ((Integer)exchange.getIn().getHeader("COUNTER"))+1);
        if (connection != null) {
            connection.close();
        }
    } finally {
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
gomzee
  • 103
  • 1
  • 9
0

Well, traditional request-reply has by design just 1 reply message. The thread waiting for the response stops listening as soon as the first reply arrives.

With JMS correlation IDs (no dedicated thread per request) it would theoretically be possible to receive multiple replies for the same request, but I don't know if this really works/is allowed in JMS.

Update based on comments

You write in comments that you are able to receive multiple JMS replies for one request and that you even get the number of answers to expect.

If this all works, you can use the Aggregator EIP in your Camel route to collect all responses before sending a reply to the caller.

The Aggregator is highly configurable. You can decide how to combine the responses and you can also define multiple completion criteria (timeout, number of messages etc).

burki
  • 6,741
  • 1
  • 15
  • 31
  • However the main problem of the multi-reply solution remains the same. How long do you wait until you treat a request as done and expect no more replies? **[We have a timeout for this. So this is not an issue]** Coming back to the single reply solution: you have to aggregate all reply messages where they are created. **[Can't do this aggregate on sender side. As that's not in our control.]** – gomzee Jun 22 '20 at 07:29
  • I updated my answer since you provided more information about your case in comments. – burki Jun 22 '20 at 08:05
  • @burik/@Kavitha I also have understanding on Aggregator EIP and I can use it. Only problem I force see is how to solve this: **With JMS correlation IDs (no dedicated thread per request) it would theoretically be possible to receive multiple replies for the same request, but I don't know if this really works/is allowed in JMS.** – gomzee Jun 22 '20 at 08:10