I been working on implementing a PUB/SUB service using spring-data-Redis. I have been researching and following the web and got something to work fine. my problem is that I need absolute reliability when a message is not processed ( either an Exception is thrown or a logic error occurs ). In which case I need the message to return to the topic for a retry ( by another subscriber or even the same ).
I have looked at several questions, particularly the following: Redis Pub/Sub with Reliability and How to implement Redis Multi-Exec by using Spring-data-Redis
I have understood that I should use multi, exec for managing a transaction, but I couldn't get it to work.
Here is a simplified version of my code
@Configuration
@PropertySource(value = { "classpath:application.properties" })
public class RedisConfig {
@Autowired
Environment env;
@Bean
public MessageListenerAdapter messageListener() {
MyMessageListenerAdapter messageListeneradapter = new MyMessageListenerAdapter(new RedisMessageSubscriber());
messageListeneradapter.afterPropertiesSet();
return messageListeneradapter;
}
@Bean(name="RedisMessagePublisherBean")
public RedisMessagePublisher messagePublisher() {
return new RedisMessagePublisher();
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String , Object> template = new RedisTemplate<>();
template.setValueSerializer(new GenericToStringSerializer<Object>(Object.class));
template.setEnableTransactionSupport(true);
template.setConnectionFactory(lettuceConnectionFactory());
return template;
}
@Bean
public RedisMessageListenerContainer redisContainer() {
RedisMessageListenerContainer container
= new RedisMessageListenerContainer();
container.setConnectionFactory(lettuceConnectionFactory());
container.addMessageListener(messageListener(), topic());
return container;
}
@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
LettuceConnectionFactory factory = new LettuceConnectionFactory();
factory.setValidateConnection(true);
factory.setDatabase(1);
factory.afterPropertiesSet();
return factory;
}
@Bean
public ChannelTopic topic() {
return new ChannelTopic("MQ_TOPIC");
}
public class MyMessageListenerAdapter extends MessageListenerAdapter{
public MyMessageListenerAdapter(RedisMessageSubscriber redisMessageSubscriber) {
super(redisMessageSubscriber);
}
@Override
public void onMessage(Message message, byte[] pattern) {
RedisTemplate<?, ?> template = redisTemplate();
template.execute(new SessionCallback<String>() {
@Override
public <K, V> String execute(RedisOperations<K, V> operations) throws DataAccessException {
operations.multi();
System.out.println("got message");
String result = doSomeLogic(message);
if (result == null)
operations.discard();
else
operations.exec();
return null;
}
}) ;
}
}
}
My requirements are that if a message failed to process ( I can leave without runtime exceptions etc.. strictly logical error would suffice for now ), It will return to the topic.
Any help is appreciated, Thanks!