I'm using Apache Camel, SpringBoot and WMQ in my application to connect to an external service. When the connection to this service fails e.g. when the service is down, I'm rolling back the message to the input queue using JMS transaction. However, the messages are re-consumed instantly after redelivery, which is not what I want. I'd like the messages to be consumed again but after say every 30 minutes giving the external service enough time to come back up.
Is there a way to configure this with JMS transactions and adding Hystrix CircuitBreaker (or any other library) in Apache Camel?
Thanks
JMS Configuration:
@Bean
public UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter(final MQQueueConnectionFactory connectionFactory) {
UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
connectionFactoryAdapter.setTargetConnectionFactory(connectionFactory);
return connectionFactoryAdapter;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory(
final UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setCacheConsumers(true);
cachingConnectionFactory.setCacheProducers(true);
cachingConnectionFactory.setReconnectOnException(true);
cachingConnectionFactory.setSessionCacheSize(1);
cachingConnectionFactory.setTargetConnectionFactory(connectionFactoryAdapter);
return cachingConnectionFactory;
}
@Bean
public JmsTransactionManager jmsTransactionManager(final CachingConnectionFactory cachingConnectionFactory) {
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
jmsTransactionManager.setConnectionFactory(cachingConnectionFactory);
return jmsTransactionManager;
}
@Bean
public SpringTransactionPolicy springTransactionPolicy(final PlatformTransactionManager jmsTransactionManager) {
SpringTransactionPolicy policy = new SpringTransactionPolicy();
policy.setTransactionManager(jmsTransactionManager);
policy.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return policy;
}
@Bean
public JmsConfiguration jmsConfiguration(final CachingConnectionFactory cachingConnectionFactory,
final JmsTransactionManager jmsTransactionManager) {
JmsConfiguration config = new JmsConfiguration();
config.setConnectionFactory(cachingConnectionFactory);
config.setTransactionManager(jmsTransactionManager);
config.setTransacted(true);
config.setConcurrentConsumers(4);
config.setMaxConcurrentConsumers(8);
return config;
}
@Bean
public JmsComponent jmsComponent(final JmsConfiguration jmsConfiguration) {
JmsComponent component = new JmsComponent();
component.setConfiguration(jmsConfiguration);
component.setPreserveMessageQos(true);
component.setDeliveryPersistent(false);
return component;
}
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(2);
redeliveryPolicy.setMaximumRedeliveryDelay(10000);
redeliveryPolicy.setRedeliveryDelay(10000);
return redeliveryPolicy;
}
Camel Routes (using SpringRouteBuilder):
errorHandler(
transactionErrorHandler()
.maximumRedeliveries(redeliveryPolicy.getMaximumRedeliveries())
.redeliveryDelay(redeliveryPolicy.getRedeliveryDelay())
.maximumRedeliveryDelay(redeliveryPolicy.getMaximumRedeliveryDelay()));
onException(MyException.class)
.markRollbackOnly()
.redeliveryPolicy(redeliveryPolicy)
.useExponentialBackOff()
.handled(true)
from("jms:queue:myQueue")
.id("myRoute")
.autoStartup(true)
.tracing()
.transacted()
.process(bean)
.stop();