0

I use spring amqp in my project, and I use implements ChannelAwareMessageListener for re-send and handle exception to make rabbit listener more stable:

   public abstract class AbstractMessageListener implements ChannelAwareMessageListener {


    @Autowired
    private Jackson2JsonMessageConverter messageConverter;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /** where comsumer really do biz */
    public abstract void receiveMessage(Message message, MessageConverter messageConverter);

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        MessageProperties messageProperties = message.getMessageProperties();
        Long deliveryTag = messageProperties.getDeliveryTag();
        Long consumerCount = redisTemplate.opsForHash().increment(MQConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
                messageProperties.getMessageId(), 1);
        try {
            receiveMessage(message, messageConverter);
            channel.basicAck(deliveryTag, false);             
            redisTemplate.opsForHash().delete(MQConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
                    messageProperties.getMessageId());
        } catch (Exception e) {

            if (consumerCount >= MQConstants.MAX_CONSUMER_COUNT) {               
                channel.basicReject(deliveryTag, false);
            } else {               
                Thread.sleep((long) (Math.pow(MQConstants.BASE_NUM, consumerCount)*1000));
                channel.basicNack(deliveryTag, false, true);
            }
        }
    }

then we can receive by extend our AbstractMessageListener like that:

public class BizMessageListener extends AbstractMessageListener  {

    Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void receiveMessage(Message message, MessageConverter messageConverter) {
        //do our own biz
    }
}

but one day my boss said this way is too Invasion you mush use annotation instead, so I found something like that: Spring RabbitMQ - using manual channel acknowledgement on a service with @RabbitListener configuration

where I can use annotation as

@RabbitListener(queues = "so38728668")
        public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
                throws IOException {

but how can I encapsulate @RabbitListener to a high level to combine my own re-send msg code in my first code sample , for example there is a annotation as RabbitResenderListener

 @RabbitResenderListener(queues = "so38728668")
            public void receive(Message msg)
                    throws IOException {
 // just do biz 
}

this annotation give the method re-send msg and error-handle ability, so that the method only do biz. thks

vvsueprman
  • 143
  • 1
  • 2
  • 10

1 Answers1

0

Not sure what you expect from us, but seems for me annotations are not for extensions and extracting abstractions like we can do with classes.

Anyway I can share some idea you may try.

There is a method level @RabbitHandler annotation. So, you can mark a method in super class with that and do all the infrastructure logic. The target implementation should be marked with the particular @RabbitListener on the class to bring the required queues configuration. The @RabbitHandler method will call the polymorphic method from the inheritor.

Does it sound reasonable?

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Yeah, that what I want to do, instead I do not want to make a super class, because if I make a super class, ever listener we will write must extend this super class, as it's called intrusive. What I want to do is to write a class extend some amqp source code, and write my infrastructure logic in this class. then ever class with @RabbitListener can only do business logic which means non-intrusive. – vvsueprman Jan 24 '18 at 02:17
  • Would be great to see some PoC, but anyway the `@RabbitListener` is already AMQP source and it is very tied to the method contract and it is very close to the listener container. You might need to take a look into Spring Integration for the next level of abstraction between services: https://projects.spring.io/spring-integration/ – Artem Bilan Jan 24 '18 at 02:35
  • Hi, I found class MessagingMessageListenerAdapter.java in amqp source code, I think maybe I can override onMessage() method and add my own infrastructure logic. whether this work? And how to do that, can spring recognize my new class by extend MessagingMessageListenerAdapter ? or there are other methods to overlap source code of amqp? thx – vvsueprman Jan 25 '18 at 09:06
  • No, you can't do that. That's very specific code and tied with the whole annotation processing. You may consider to implement some AOP Advice though, which can be applied to selected business services and may call an appropriate infrastructure logic. The `@RabbitListener` has `containerFactory()` option, where that `AbstractRabbitListenerContainerFactory` can be supplied with the `adviceChain`. – Artem Bilan Jan 25 '18 at 17:31