0

I need to save m messages in rabbit mq. I am using acknowledgeMode as MANUAL in SimpleMessageListenerContainer. This is helping me store the value in unacked in rabbit mq. But even after job completion the messages remain in the unacked. I need the messages to be deleted once job gets completed successfully. Please help me find a solution

<beans:bean id="PartitionHandler" class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler" init-method="afterPropertiesSet" scope="job">
        <beans:property name="messagingOperations" ref="messagingTemplate"></beans:property>
        <beans:property name="stepName" value="slave" />
        <beans:property name="gridSize" value="${spring.gridsize}" />
        <beans:property name="pollInterval" value="5000"></beans:property>
        <beans:property name="jobExplorer" ref="jobExplorer"></beans:property>
        <beans:property name="replyChannel" ref="outboundReplies"></beans:property>
    </beans:bean>

    <beans:bean id="PeriodicTrigger" class="org.springframework.scheduling.support.PeriodicTrigger">
        <beans:constructor-arg value="5000"></beans:constructor-arg>
    </beans:bean> 


<beans:bean id="requestQueue" class="org.springframework.amqp.core.Queue">
    <beans:constructor-arg name="name" value="testQueue">
    </beans:constructor-arg>
    <beans:constructor-arg name="durable" value="true">
    </beans:constructor-arg> 
</beans:bean>

<int:poller id="PollerMetadata" default="true" trigger="PeriodicTrigger" task-executor="taskExecutor"></int:poller>  

<beans:bean id="amqptemplate" 
    class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <beans:property name="connectionFactory" ref="rabbitConnFactory" />
    <beans:property name="routingKey" value="testQueue"/>
    <beans:property name="queue" value="testQueue"/>
</beans:bean>

<beans:bean id="amqpOutboundEndpoint" class="org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint">
    <beans:constructor-arg ref="amqptemplate"/>
    <beans:property name="expectReply" value="false"></beans:property>
    <beans:property name="routingKey" value="testQueue"></beans:property>
    <beans:property name="outputChannel" ref="inboundRequests"></beans:property>
</beans:bean>

<int:service-activator ref="amqpOutboundEndpoint" input-channel="outboundRequests"/>

<beans:bean id="SimpleMessageListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <beans:constructor-arg ref="rabbitConnFactory"/>
    <beans:property name="queueNames" value="testQueue"></beans:property>
    <beans:property name="autoStartup" value="false"></beans:property> 
    <beans:property name="acknowledgeMode" value="MANUAL"></beans:property>
    <beans:property name="concurrentConsumers" value="5"></beans:property>
</beans:bean>



<beans:bean id="AmqpInboundChannelAdapter" class="org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter"  init-method="afterPropertiesSet">
    <beans:constructor-arg ref="SimpleMessageListenerContainer"/>
    <beans:property name="outputChannel" ref="inboundRequests"></beans:property>
</beans:bean>

<beans:bean id="StepExecutionRequestHandler" class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
    <beans:property name="jobExplorer" ref="jobExplorer"/>
    <beans:property name="stepLocator" ref="stepLocator"/>
</beans:bean>

<int:service-activator ref="StepExecutionRequestHandler" input-channel="inboundRequests" output-channel="outboundStaging"/>


<bean id="rabbitConnFactory" 
    class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg><value>localhost</value></constructor-arg>
    <property name="username" value="guest" />
    <property name="password" value="guest" />
    <property name="virtualHost" value="/" />
    <property name="port" value="5672" />
</bean>


<bean id="admin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
    <constructor-arg ref="rabbitConnFactory" />
</bean>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
 <constructor-arg ref="outboundRequests" />
  <property name="receiveTimeout" value="60000000"/>
</bean>

<bean id="outboundRequests" class="org.springframework.integration.channel.DirectChannel" >
<property name="maxSubscribers" value="5"></property>
</bean>


<int:channel id="outboundReplies" scope="job"><int:queue/></int:channel>

<bean id="outboundStaging" class="org.springframework.integration.channel.NullChannel"></bean>

<bean id="inboundRequests" class="org.springframework.integration.channel.QueueChannel"></bean>

<bean id="stepLocator" class="org.springframework.batch.integration.partition.BeanFactoryStepLocator"/>

1 Answers1

0

When using MANUAL acks, you are responsible for the acknowledgment.

See my answer to this question.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Hi Gary ! Thank you for taking time and replying to my question. But since i am new to rabbit mq i have few more doubts. I am using native spring batch and not spring boot. All the jobs that are in my application have xml configuration and even rabbit mq is onfigured using xml. So, I am confused on how to call channel.basicAck(). Pleas e find below my listener code – Deepthi Warrier Apr 25 '19 at 13:29
  • Don't put stuff like that in comments; it's not formatted and too hard to read. edit your question instead. You need to show much more configuration too. e.g. I don't see a listener there, just the container. – Gary Russell Apr 25 '19 at 13:33
  • I have added the rabbit mq configuration code in the question. – Deepthi Warrier Apr 25 '19 at 13:48
  • There is no built-in support for MANUAL acks in Spring Boot. However, with that configuration you don't need them since the batch step runs on the container thread so the messages won't be ack'd until the step execution completes. – Gary Russell Apr 25 '19 at 14:54
  • Thanks Gary ! But in my case the messages are not getting acknowledged even after job completion. – Deepthi Warrier Apr 26 '19 at 06:57
  • So, change the ack mode to AUTO. – Gary Russell Apr 26 '19 at 13:02