3

I been working on implementing a PUB/SUB service using spring-data-Redis. I have been researching and following the web and got something to work fine. my problem is that I need absolute reliability when a message is not processed ( either an Exception is thrown or a logic error occurs ). In which case I need the message to return to the topic for a retry ( by another subscriber or even the same ).

I have looked at several questions, particularly the following: Redis Pub/Sub with Reliability and How to implement Redis Multi-Exec by using Spring-data-Redis

I have understood that I should use multi, exec for managing a transaction, but I couldn't get it to work.

Here is a simplified version of my code

@Configuration
@PropertySource(value = { "classpath:application.properties" })
public class RedisConfig {


@Autowired
Environment env;

@Bean
public MessageListenerAdapter messageListener() { 
    MyMessageListenerAdapter messageListeneradapter =  new MyMessageListenerAdapter(new RedisMessageSubscriber());
    messageListeneradapter.afterPropertiesSet();
    return messageListeneradapter;
}


@Bean(name="RedisMessagePublisherBean")
public RedisMessagePublisher messagePublisher() {
    return new RedisMessagePublisher();
}


@Bean
public RedisTemplate<String, Object> redisTemplate() {
    RedisTemplate<String , Object> template = new RedisTemplate<>();
    template.setValueSerializer(new GenericToStringSerializer<Object>(Object.class));
    template.setEnableTransactionSupport(true);
    template.setConnectionFactory(lettuceConnectionFactory());
    return template;
}


@Bean
public RedisMessageListenerContainer redisContainer() {
    RedisMessageListenerContainer container 
      = new RedisMessageListenerContainer(); 
    container.setConnectionFactory(lettuceConnectionFactory()); 
    container.addMessageListener(messageListener(), topic()); 
    return container; 
}
@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
    LettuceConnectionFactory factory =  new LettuceConnectionFactory();
    factory.setValidateConnection(true);
    factory.setDatabase(1);
    factory.afterPropertiesSet();
    return factory;
}

@Bean
public ChannelTopic topic() {
    return new ChannelTopic("MQ_TOPIC");
}


public  class MyMessageListenerAdapter  extends MessageListenerAdapter{
    public MyMessageListenerAdapter(RedisMessageSubscriber redisMessageSubscriber) {
        super(redisMessageSubscriber);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisTemplate<?, ?> template = redisTemplate();

        template.execute(new SessionCallback<String>() {

            @Override
            public <K, V> String execute(RedisOperations<K, V> operations) throws DataAccessException {
                operations.multi();
                System.out.println("got message");

                String result = doSomeLogic(message);
                if (result == null)
                    operations.discard();
                else
                    operations.exec();

                return null;
            }
        })  ;


    }

}
}

My requirements are that if a message failed to process ( I can leave without runtime exceptions etc.. strictly logical error would suffice for now ), It will return to the topic.

Any help is appreciated, Thanks!

0 Answers0