1

I am using Spring's message-driven-channel-adapter. My component is consuming message from Tibco Topic and Publishing to RabbitMQ topic

So The message flow is as follows: Tibco-> (subscribed by )Component (Published to)-> RabbitMQ

The service activator is shown below: as we see there is a input-channel and an output-channel. The bean storeAndForwardActivator will have the business logic (within the method createIssueOfInterestOratorRecord)

<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
    ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
    output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />

I also have a message=driven-channel-adapter. This adapter will be invoked before the service adapter is invoked.

<int-jms:message-driven-channel-adapter
    id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
    container="oratorIssueOfInterestmessageListenerContainer" />

i.e. specifically the container (shown below) will hold the Topic name to be used - this is the DefaultMessageListenerContainer

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />
</bean>

This set up works perfectly fine. However in some cases my consumer/component receives a 'rogue' message. i.e. an empty payload or a message type of HashMap (instead of plain TextMessage) - when we get this - what I observe is - an exception is caught at the DefaultMessageListener level (i.e. I don't go as far as my business bean i.e. storeAndForwardActivator), because of this my component is not sending ACK back - and since this is a durable Topic - there is a build of messages at the Topic - which is undesirable. Is there a way for me to ACK the message straight away irrespective of weather an exception is caught at the DefaultMessageListener level?

Or should I introduce an error handler at the DefaultMessageListener? What's the best way to handle this, any suggestions?

regards D

Update:

I tried adding a errorHandler to the org.springframework.jms.listener.DefaultMessageListenerContainer as shown below

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />

    <property name="errorHandler" ref="myErrorHandler"/>
</bean>

myErrorHandler is a bean as shpwn below

<bean id="myErrorHandler"
    class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />

MyErroHandler implements ErrorHandler

 @Service
 public class MyErrorHandler implements ErrorHandler{

private static Log log = LogFactory.getLog(MyErrorHandler.class);

@Override
   public void handleError(Throwable t) {

        if (t instanceof MessageHandlingException) {
            MessageHandlingException exception = (MessageHandlingException) t;
            if (exception != null) {
                org.springframework.messaging.Message<?> message = exception.getFailedMessage();
                Object payloadObject = message.getPayload();
                if (null != payloadObject) {
                    log.info("Payload  is not null, type is: " + payloadObject.getClass());
                }
            }
        } else {
            log.info("Exception is not of type: MessageHandlingException ");
        }
}

}

What I notice is that the exception is caught (when the subscriber consumes a rogue message). I keep on seeing this log in a loop

    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 

i.e. since the transaction is not committed - the same message from durable topic is consumed again and again. My aim is to send an ACK back to the broker after consuming the message (irrespective of weather an exception is caught or not).

I will try the error-channel tomorrow.

regards D

user2595169
  • 147
  • 1
  • 14

1 Answers1

1

Add an error-channel to the message-driven adapter; the ErrorMessage will contain a MessagingException payload that has two fields; the cause (exception) and failedMessage.

If you use the default error-channel="errorChannel", the exception is logged.

If you want to do more than that you can configure your own error channel and add some flow to it.

EDIT:

Further to your comments below...

payload must not be null is not a stack trace; it's a message.

That said, payload must not be null looks like a Spring Integration message; it is probably thrown in the message listener adapter during message conversion, which is before we get to a point where the failure can go to the error-channel; such an exception will be thrown back to the container.

Turn on DEBUG logging and look for this log entry:

logger.debug("converted JMS Message [" + jmsMessage + "] to integration Message payload [" + result + "]");

Also, provide a FULL stack trace.

EDIT#2

So, I reproduced your issue by forcing the converted payload to null in a custom MessageConverter.

The DMLC error handler is called by the container after the transaction is rolled back so there's no way to stop the rollback.

We can add an option to the adapter to handle such errors differently but that will take some work.

In the meantime, a work-around would be to write a custom MessageConverter; something like the one in this Gist.

Then, your service will have to deal with handling the "Bad Message Received" payload.

You then provide the custom converter like this...

<jms:message-driven-channel-adapter id="jmsIn"
        destination="requestQueue" acknowledge="transacted"
        message-converter="converter"
        channel="jmsInChannel" />

<beans:bean id="converter" class="foo.MyMessageConverter" />
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Hi Gary, I noticed that the exception for the ill formatted message is caught within the container section, i.e. bean id="oratorIssueOfInterestmessageListenerContainer". which is why I tried adding a custom error handler (as shown above). But it appears the exception is caught 'gracefully' but the transaction is not committed, i.e. no ack was sent to the Broker and since its a durable topic - the same message is consumed again and again and it goes in a loop. Whats the best way to handle exception and also send an ACK back to the Broker? – user2595169 Mar 03 '15 at 09:04
  • You need to show the stack trace; an ObjectMessage containing a HashMap shouldn't be "badly formed". You shouldn't be using an `ErrorHandler` to handle `MessageHandlingException`; such exceptions should be handled in an `error-channel` as I said. – Gary Russell Mar 03 '15 at 13:51
  • You were right! HashMap was a RedHerring. error-handler I have put under MessageListenerContainer shows the following stacktrace: payload must not be null. So it appears I am getting a JMS with empty payload - since I am using the following message selector: - and the payload is null/empty - an exception is thrown in the MessageListenerContainer (and caught by the errorHandler) - I think this is what is happening. – user2595169 Mar 03 '15 at 18:39
  • I have introduced a error-channel="errorChannel" under message-driven-channel-adapter but error-channel is not invoked at all since an exception is caught within the container itself! I need to find a way to ACK back the broker even when there is an empty payload. – user2595169 Mar 03 '15 at 18:40
  • Hi Gary, Pls see this link: http://forum.spring.io/forum/spring-projects/integration/96229-consuming-unparseable-jms-messages I wonder if the same applies for Tibco as well? I am seeing similar behaviour i.e. - Message has no payload (IBM MQ Header: content = nil) � JMS Listener throws an Exception that does NOT get passed to the errorchannel, but instead rolls the transaction back, leaving message on the Queue (for immediate and infinite retry). - Message has empty payload � JMS Listener or SI Base Transformer throws an Exception that DOES get passed to the error channel. – user2595169 Mar 03 '15 at 19:01
  • I added some more comments to the answer. – Gary Russell Mar 03 '15 at 20:03
  • I subsequently added a work-around; I have created a [JIRA issue](https://jira.spring.io/browse/INT-3670) to make this easier to deal with. You should also look at configuring the broker to send such poison messages to a DLQ after a number of failures. – Gary Russell Mar 03 '15 at 20:28
  • Thanks Gary! I can confirm this workaround works as you suggested. I am taking this forward. Thanks again. – user2595169 Mar 04 '15 at 12:30