2

What would be the easiest way to shut down SimpleMessageListenerContainer (created programatically, not as bean) on any possible error (missing queue, connection problem, etc.), and create new one (with re-declaring all the bindings in the run time.

I'm using helix for partition management, and have 1 listener per partition. One possibility would be also to use existing SimpleMessageListenerContainer (not to always create new one), but in this case, I would need to retry queue re-declaration and rebinding in case of any failure.

Also, there seems to be different kinds of exceptions - fatal (eg queue deleted in runtime) and non fatal (connection lost). How to handle both situations at once?

What would be easier option of these two?

UPDATED

private Map<SimpleMessageListenerContainer, AtomicBoolean> shuttingDown = new ConcurrentHashMap<>();

@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent listenerContainerConsumerFailedEvent) {

    boolean fatal = listenerContainerConsumerFailedEvent.isFatal();
    SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)listenerContainerConsumerFailedEvent.getSource();

    if(fatal){
        AtomicBoolean sd = shuttingDown
                .computeIfAbsent(listenerContainer, v -> new AtomicBoolean(false));
        if(sd.compareAndSet(false, true)) {
            System.out.println("RECREATING");
            String[] qn = listenerContainer.getQueueNames();
            String q = qn[0];
            recreateQueue(q);
            listenerContainer.stop();
            listenerContainer.start();
            //delete from shuttingDown ?
        }
        else{
            System.out.println("RECREATING_NOT");
        }
    }
    else{
        System.out.println("NON_FATAL");
    }
}

and the output

NON_FATAL
NON_FATAL
NON_FATAL
NON_FATAL
22:36:44.044 [SimpleAsyncTaskExecutor-7] ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Consumer received fatal=false...\
...

RECREATING
RECREATING_NOT
RECREATING_NOT
RECREATING_NOT
22:36:44.057 [SimpleAsyncTaskExecutor-6] ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Stopping container from aborted consumer
Bojan Vukasovic
  • 2,054
  • 22
  • 43
  • What is the reason for re-creating the SimpleMessageListenerContainer bean in case of a failure? Doesn't Spring AMQP contain recovery/reconnect functionality? – Krzysztof Tomaszewski Oct 04 '19 at 06:39
  • @KrzysztofTomaszewski questions was related to shutting down container and starting it at some time later (not immediately). – Bojan Vukasovic Oct 04 '19 at 07:23
  • This is clear for me. I was curious if this is something generally needed after experiencing a RabbitMQ broker failure? – Krzysztof Tomaszewski Oct 04 '19 at 08:20
  • 1
    @KrzysztofTomaszewski I needed this for partition change - to shut down simple message listener when some other component takes over the partition. In case there is failure - spring will reconnect automatically. – Bojan Vukasovic Oct 04 '19 at 09:05
  • OK. I just have a case when the SimpleMesssageListenerContainer somehow silently (no ERROR log message) disconnected from the RabbitMQ broker and it was not receiving messages present in the queue. Still digging... – Krzysztof Tomaszewski Oct 04 '19 at 11:34

1 Answers1

0

Add an ApplicationEventPublisher to the container; ListenerContainerConsumerFailedEvents have a fatal boolean property.

EDIT

@SpringBootApplication
public class So47357940Application {

    public static void main(String[] args) {
        SpringApplication.run(So47357940Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(AmqpAdmin admin) {
        return args -> admin.deleteQueue("so47357940");
    }

    @RabbitListener(queues = "so47357940")
    public void listen(String in) {
        System.out.println(in);
    }

    private final Map<SimpleMessageListenerContainer, AtomicBoolean> shuttingDown = new ConcurrentHashMap<>();

    @Bean
    public ApplicationListener<ListenerContainerConsumerFailedEvent> failures(AmqpAdmin admin,
            RabbitTemplate template) {
        return event -> {
            if (event.isFatal()) {
                SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
                AtomicBoolean sd = this.shuttingDown.computeIfAbsent(container, v -> new AtomicBoolean());
                if (sd.compareAndSet(false, true)) {
                    System.out.println("RECREATING");
                    String[] qn = container.getQueueNames();
                    String q = qn[0];
                    admin.declareQueue(new Queue(q));
                    // better to use a shared exec
                    ExecutorService exec = Executors.newSingleThreadExecutor();
                    exec.execute(() -> {
                        while (container.isRunning()) {
                            // should probably give up at some point
                            try {
                                Thread.sleep(100);
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                        }
                        container.start();
                        template.convertAndSend("so47357940", "foo");
                        this.shuttingDown.remove(container);
                    });
                }
                else {
                    System.out.println("RECREATING_NOT");
                }
            }
            else {
                System.out.println("NON_FATAL");
            }
        };
    }

}

Here are the debug logs I get...

RECREATING
2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,3)
2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$144/1094003461 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'so47357940'
2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Cancelling Consumer@3a813488: tags=[Cancelling Consumer@3a813488: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.listener.BlockingQueueConsumer   : Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Closing cached Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2)
2017-11-17 17:38:53.903 ERROR 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2017-11-17 17:38:53.903 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Shutting down Rabbit listener container
2017-11-17 17:38:53.903  INFO 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2017-11-17 17:38:53.903  INFO 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
2017-11-17 17:38:54.003 DEBUG 42372 --- [pool-4-thread-1] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
2017-11-17 17:38:54.004 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@3a2547b8: tags=[Starting consumer Consumer@3a2547b8: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0], channel=null, acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.005 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Started on queue 'so47357940' with tag amq.ctag-3wMG_13-68ibLL05ir3ySA: Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.005 DEBUG 42372 --- [ool-1-thread-11] o.s.a.r.listener.BlockingQueueConsumer   : ConsumeOK : Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,4)
2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitTemplate$$Lambda$146/1108520685 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message (Body:'foo' MessageProperties [headers=Publishing message (Body:'foo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=3, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [], routingKey = [so47357940], contentType=text/plain, contentEncoding=UTF-8, contentLength=3, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [], routingKey = [so47357940]
2017-11-17 17:38:54.012 DEBUG 42372 --- [ool-1-thread-12] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
2017-11-17 17:38:54.012 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Received message: (Body:'foo' MessageProperties [headers=Received message: (Body:'foo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=so47357940, deliveryTag=1, consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, consumerQueue=so47357940]), contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=so47357940, deliveryTag=1, consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, consumerQueue=so47357940])
2017-11-17 17:38:54.015 DEBUG 42372 --- [cTaskExecutor-3] .a.r.l.a.MessagingMessageListenerAdapter : Processing [GenericMessage [payload=foo, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=so47357940, amqp_contentEncoding=UTF-8, amqp_deliveryTag=1, amqp_consumerQueue=so47357940, amqp_redelivered=false, id=b614d9e6-1744-b600-7d86-ca9c51ad5844, amqp_consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, contentType=text/plain, timestamp=1510958334014}]]
foo
Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • Thanks for prompt response. I already tried this approach, but have problem with multiple calls to this method - on failure there is at least 6 calls. Is it enough in this case just to recreate binding, and call start() on same ListenerContainer to make it work again? – Bojan Vukasovic Nov 17 '17 at 21:22
  • Yes, you can restart a failed container if the fatal failure has been resolved. – Gary Russell Nov 17 '17 at 21:28
  • Can you please check updated code in question above - seems like I still get only "Stopping container from aborted consumer" and no listeners, and the queue is actually recreated. – Bojan Vukasovic Nov 17 '17 at 21:41
  • You can't do the stop/start on the listener thread; you also don't need to `stop()`; better to wait until the container stops itself. See my edit. – Gary Russell Nov 17 '17 at 22:14
  • I am guessing it's a race condition and the `start()` doesn't do anything because the listener thread is still active. We should probably take a look at it to see if we can at least log an error if you attempt to `start()` the container on a stale listener thread [AMQP-785](https://jira.spring.io/browse/AMQP-785). – Gary Russell Nov 17 '17 at 22:33
  • Just trying your code, but still cannot run. This is probably what you are talking about? I mean, I see that isRunning=false, but start does nothing. – Bojan Vukasovic Nov 17 '17 at 22:35
  • Works for me - I added DEBUG logs so you can compare. – Gary Russell Nov 17 '17 at 22:42
  • Works also for me with your original, but when I add concurrency `@RabbitListener(queues = "so47357940", concurrency = "4")` I have problem (start does not work). – Bojan Vukasovic Nov 17 '17 at 22:53
  • Yeah - me too - definitely a race condition - if I add an additional `Thread.sleep(5000)` before `start()` it works. I changed that JIRA issue to a `Bug`. – Gary Russell Nov 17 '17 at 23:09
  • Yes. OK, thanks then... That was the problem all along :) Suppose this is current workaround that is not 100% working fix, but good enough... – Bojan Vukasovic Nov 17 '17 at 23:14
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/159306/discussion-between-bojanv55-and-gary-russell). – Bojan Vukasovic Nov 18 '17 at 20:07