1

I am learning Spring Integration JMS. I ran into a problem where my topic does not persist pending messages which are not yet consumed by the client.

Basically I start ActiveMQ then using REST Client I am invoking producer to send message for 50 times so that 50 messages gets enqueued in topic. At consumer end I have applied sleep timer of 5 seconds so that each message gets consumed at regular interval of 5s. Then in between I stopped ActiveMQ. Meanwhile some messages are consumed by client lets say 15 out of 50 have been consumed. Then If I restart ActiveMQ I was expecting topic to persist pending 35 messages but I can not see that in admin console under topics tab.

Here is my configuration file:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:oxm="http://www.springframework.org/schema/oxm"
       xmlns:int-jme="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
                http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd">


    <!-- Component scan to find all Spring components -->
    <context:component-scan base-package="com.geekcap.springintegrationexample" />

    <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
        <property name="order" value="1" />
        <property name="messageConverters">
            <list>
                <!-- Default converters -->
                <bean class="org.springframework.http.converter.StringHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.FormHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.ByteArrayHttpMessageConverter" />
                <bean class="org.springframework.http.converter.xml.SourceHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.BufferedImageHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />
            </list>
        </property>
    </bean>

    <!-- Define a channel to communicate out to a JMS Destination -->
    <int:channel id="topicChannel"/>

    <!-- Define the ActiveMQ connection factory -->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <!--
        Define an adaptor that route topicChannel messages to the myTopic topic; the outbound-channel-adapter
        automagically fines the configured connectionFactory bean (by naming convention
      -->
    <int-jms:outbound-channel-adapter channel="topicChannel"
                                      destination-name="topic.myTopic"
                                      pub-sub-domain="true" />

    <!-- Create a channel for a listener that will consume messages-->
    <int:channel id="listenerChannel" />

    <int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
                                            channel="getPayloadChannel"
                                            destination-name="topic.myTopic"
                                            pub-sub-domain="true" />

    <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" />

    <int:channel id="getPayloadChannel" />

    <int:service-activator input-channel="getPayloadChannel" output-channel="listenerChannel" ref="retrievePayloadServiceImpl" method="getPayload" />

</beans>

I also read that default mode is persistent. But in my case it does not seems to be worked.

EDIT:

As per the answer given by Gary Russel after adding attributes

  • subscription-durable="true"
  • durable-subscription-name="mySubscription"

in <int-jms:message-driven-channel-adapter> I am facing XML related issues

  • cvc-complex-type.3.2.2: Attribute 'subscription-durable' is not allowed to appear in element 'int-jms:message-driven-channel- adapter'.

  • cvc-complex-type.3.2.2: Attribute 'durable-subscription-name' is not allowed to appear in element 'int-jms:message-driven-channel- adapter'.

enter image description here

Please Help

mahendra kawde
  • 855
  • 4
  • 25
  • 44

2 Answers2

1

That is how topics work, by default, read the JMS specification.

Topics are publish/subscribe; only subscribers that are present get to receive the message.

If you publish 5, start the consumer, publish another 5; he will only get the second 5.

If you kill the broker before he gets all 5; during the restart, the broker sees there are no consumers so he purges the messages.

You can change this behavior by using durable subscriptions, in which case the broker will indeed retain messages for each such subscription, even if not currently connected.

To configure this with Spring Integration, set subscription-durable on the message-driven channel adapter and give it a unique subscription-name.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • thanks for your answer. It indeed helpful. But one question is do I need any other configuration than what you mentioned OR just adding `subscription-durable` on message-driven channel adapter with unique `subscription-name` will work. – mahendra kawde Oct 15 '15 at 13:12
  • You mean something like this `` Right ? – mahendra kawde Oct 15 '15 at 13:13
  • 1
    Don't put code/config in comments; it doesn't render well - edit your question instead. See the description for the attributes `subscription-durable="true"` `subscription-name="foo"` should be all you need. – Gary Russell Oct 15 '15 at 13:15
  • Hi Gary thanks for your reply. I checked for attributes and added in my configuration but now my xml gives me error. I am just editing question with error mentioned there. – mahendra kawde Oct 16 '15 at 06:15
  • Please check EDIT in question above. – mahendra kawde Oct 16 '15 at 10:33
  • You must be using a very old version. Support for durable subscriptions was added in version 2.1.0.RELEASE, over 4 years ago. The current version is 4.2.0.RELEASE. – Gary Russell Oct 16 '15 at 12:18
  • I am using 4.2.0.RELEASE only – mahendra kawde Oct 16 '15 at 12:20
  • In that case, this is an IDE problem only; use versioned schemas if you can't use a spring-aware IDE - See [this answer](http://stackoverflow.com/questions/33163613/durable-subscription-activemq/33170343#33170343). – Gary Russell Oct 16 '15 at 12:33
0

Topics in Activemq are not durable and persistent, so in case one of your consumer is down. You would lost your messages.

To make topic durable and persistent you can create a durable consumer by creating unique client id per consumer.

But again, that is not distributed in case you are following microservices architecture. So multiple pods or replicas will create problem while consuming messages as in no load balancing is possible for durable consumers.

To mitigate this scenario, there is a option of Virtual topics in Activemq.More details have been provided below,

You can send your messages via your producer in topic named as VirtualTopic.MyTopic. ** Note: you must have to follow this naming convention for default activemq configuration. But yes there is also a way to override this naming convention.

Now, to consume your messages via multiple consumers(A and B here), you have to set naming convention for your consumer side destination as well for eg. Consumer.A.VirtualTopic.MyTopic Consumer.B.VirtualTopic.MyTopic These two consumer will receive messages through the topic created above, also with load balancing enabled between multiple replicas of same consumer.

I hope this will help you fixing your problem with activemq topic.

chetan mahajan
  • 723
  • 7
  • 9