3

I am trying to put in place durable subscriber for my messages so that they will get persist in topic even after server restart.

But during configuration I am getting error related to xml:

Here is my configuration xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:oxm="http://www.springframework.org/schema/oxm"
       xmlns:int-jme="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
                http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd">


    <!-- Component scan to find all Spring components -->
    <context:component-scan base-package="com.geekcap.springintegrationexample" />

    <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
        <property name="order" value="1" />
        <property name="messageConverters">
            <list>
                <!-- Default converters -->
                <bean class="org.springframework.http.converter.StringHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.FormHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.ByteArrayHttpMessageConverter" />
                <bean class="org.springframework.http.converter.xml.SourceHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.BufferedImageHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />
            </list>
        </property>
    </bean>

    <!-- Define a channel to communicate out to a JMS Destination -->
    <int:channel id="topicChannel"/>

    <!-- Define the ActiveMQ connection factory -->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <!--
        Define an adaptor that route topicChannel messages to the myTopic topic; the outbound-channel-adapter
        automagically fines the configured connectionFactory bean (by naming convention
      -->
    <int-jms:outbound-channel-adapter channel="topicChannel"
                                      destination-name="topic.myTopic"
                                      pub-sub-domain="true" />

    <!-- Create a channel for a listener that will consume messages-->
    <int:channel id="listenerChannel" />

    <int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
                                            channel="getPayloadChannel"
                                            subscription-durable="true"
                                            durable-subscription-name="myDurableSubscription"
                                            destination-name="topic.myTopic"
                                            pub-sub-domain="true" />

    <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" />

    <int:channel id="getPayloadChannel" />

    <int:service-activator input-channel="getPayloadChannel" output-channel="listenerChannel" ref="retrievePayloadServiceImpl" method="getPayload" />

</beans>

But in message-driven-channel-adapter following attributes gives an error: enter image description here

It says:

Multiple annotations found at this line:

  • cvc-complex-type.3.2.2: Attribute 'subscription-durable' is not allowed to appear in element 'int-jms:message-driven-channel- adapter'.
  • cvc-complex-type.3.2.2: Attribute 'durable-subscription-name' is not allowed to appear in element 'int-jms:message-driven-channel- adapter'.

But in lot more examples I can see below attributes are working fine.

  • subscription-durable="true"

  • durable-subscription-name="myDurableSubscription"

Then what may be wrong with my configuration.

EDIT: Spring Integration dependencies in POM.xml

<!-- Spring Integration -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>4.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jms</artifactId>
            <version>4.2.0.RELEASE</version>
        </dependency>

EDIT:

Please see attached image: enter image description here

Also see my log which signifies that target method has been invoked which is supposed to be happened when message has been consumed by client. enter image description here Please help.

mahendra kawde
  • 855
  • 4
  • 25
  • 44

1 Answers1

5

What version of Spring Integration are you using? Support for durable subscriptions was added in version 2.1.0.RELEASE, over 4 years ago.

The current version is 4.2.0.RELEASE.

EDIT:

I forgot to mention you need a client id for durable subscriptions.

Did you look at the log messages? This seems pretty clear...

14:12:39.557 WARN [jmsIn.container-1][org.springframework.jms.listener.DefaultMessageListenerContainer] Setup of JMS message listener invoker failed for destination 'topic://topic.demo' - trying to recover. Cause: You cannot create a durable subscriber without specifying a unique clientID on a Connection

Add a clientId to your connection factory...

    <property name="clientId" value="myClient"/>
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • I am using 4.2.0.RELEASE only – mahendra kawde Oct 16 '15 at 12:20
  • I am also wondering as spring document itself has mentioned this here http://docs.spring.io/spring-integration/reference/htmlsingle/#jms-message-driven-channel-adapter – mahendra kawde Oct 16 '15 at 12:22
  • I have updated question by adding spring integration dependencies being used in pom.xml – mahendra kawde Oct 16 '15 at 12:28
  • Oh - it looks like you're not using a Spring-aware IDE (STS, IDEA etc); the error is only in the IDE it will be ok at runtime. Basic eclipse doesn't know how to find the right schema. We generally recommend using version-less schemas but if you can't use a spring-aware IDE, add the version to the schema - `spring-integration-jms-4.2.xsd` - you should also NOT be using old core spring schemas (2.5). – Gary Russell Oct 16 '15 at 12:32
  • I just added schema as you mentioned and earlier errors disappeared but now there is one more error that is `Referenced file contains errors (http://www.springframework.org/schema/integration/spring-integration-4.2.xsd). For more information, right click on the message in the Problems View and select "Show Details..."` – mahendra kawde Oct 16 '15 at 12:38
  • If i ignore this error as you mentioned it will be ok at runtime or even if couple of mentioned in the question above and run my code then messages gets enqueued but admin console shows 0 messages dequeued. I mean messages are not getting consumed by client if I simply ignore these errors. – mahendra kawde Oct 16 '15 at 12:46
  • See my edit - I just modified the [sample app](https://github.com/garyrussell/spring-integration-samples/commit/9e9cf66cd5bf3e2f6a45b690abb8fc22cd760134) to use a topic and it works exactly as expected. – Gary Russell Oct 16 '15 at 18:19
  • I saw the sample app and made changes in my config as you suggested. Now my app can consume the messages but on ActiveMQ console I can see 2 consumers instead of 1 as I am having only 1 consumer and number of Messages dequeued always shows 0 in Topics tab wherein it is actually consuming the messages I can say this because I have logger in place there. I am attaching image for reference in my question. Please see updated question. – mahendra kawde Oct 19 '15 at 06:12
  • Also can you suggest me some good examples based on **spring-integration JMS** for **Queue** and for **topic** – mahendra kawde Oct 19 '15 at 09:46
  • Earlier I started with examples given here https://github.com/garyrussell/spring-integration-samples/tree/master/basic/jms but there are some configurations that i did not understand. – mahendra kawde Oct 19 '15 at 09:49
  • I can't speak to the issues you see with the AMQ console. It's probably best to ask a new question about that, targeted to the AMQ folks. If you need more explanation about spring integration, it would be better to ask new questions. – Gary Russell Oct 19 '15 at 13:53
  • How can I access embedded activemq broker. Also I am referring to https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms example. – mahendra kawde Oct 27 '15 at 08:23
  • But I am not able to understand some of tags like `stream:stdout-channel-adapter` and `beans:beans profile="default"` and so on. Could you please help me in understanding this. – mahendra kawde Oct 27 '15 at 08:29
  • I could see these all are committed by you but the xml configuration and its relevant significance in this project is not clear to me. Looking for your help. Thanks. – mahendra kawde Oct 27 '15 at 08:36
  • Please don't keep adding comments here; start a new question if you need something else explained. You can't "access" the embedded broker; as you can see, I changed it to use an external broker for the purposes of this answer. It's just a sample app; the profiles change the behavior based on whether it's run as a `main` or a test case (see the Spring Reference Manual - chapter 'Environment Abstraction' for information on profiles). When running as a `main` the sample writes to `stdout` when run as a test, the test case gets the result from a queue channel to verify it was sent ok. – Gary Russell Oct 27 '15 at 12:35