javarabbitmqspring-integrationspring-amqphelix

SimpleMessageListenerContainer - recover from failure


What would be the easiest way to shut down SimpleMessageListenerContainer (created programatically, not as bean) on any possible error (missing queue, connection problem, etc.), and create new one (with re-declaring all the bindings in the run time.

I'm using helix for partition management, and have 1 listener per partition. One possibility would be also to use existing SimpleMessageListenerContainer (not to always create new one), but in this case, I would need to retry queue re-declaration and rebinding in case of any failure.

Also, there seems to be different kinds of exceptions - fatal (eg queue deleted in runtime) and non fatal (connection lost). How to handle both situations at once?

What would be easier option of these two?

UPDATED

private Map<SimpleMessageListenerContainer, AtomicBoolean> shuttingDown = new ConcurrentHashMap<>();

@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent listenerContainerConsumerFailedEvent) {

    boolean fatal = listenerContainerConsumerFailedEvent.isFatal();
    SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)listenerContainerConsumerFailedEvent.getSource();

    if(fatal){
        AtomicBoolean sd = shuttingDown
                .computeIfAbsent(listenerContainer, v -> new AtomicBoolean(false));
        if(sd.compareAndSet(false, true)) {
            System.out.println("RECREATING");
            String[] qn = listenerContainer.getQueueNames();
            String q = qn[0];
            recreateQueue(q);
            listenerContainer.stop();
            listenerContainer.start();
            //delete from shuttingDown ?
        }
        else{
            System.out.println("RECREATING_NOT");
        }
    }
    else{
        System.out.println("NON_FATAL");
    }
}

and the output

NON_FATAL
NON_FATAL
NON_FATAL
NON_FATAL
22:36:44.044 [SimpleAsyncTaskExecutor-7] ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Consumer received fatal=false...\
...

RECREATING
RECREATING_NOT
RECREATING_NOT
RECREATING_NOT
22:36:44.057 [SimpleAsyncTaskExecutor-6] ERROR org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Stopping container from aborted consumer

Solution

  • Add an ApplicationEventPublisher to the container; ListenerContainerConsumerFailedEvents have a fatal boolean property.

    EDIT

    @SpringBootApplication
    public class So47357940Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So47357940Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(AmqpAdmin admin) {
            return args -> admin.deleteQueue("so47357940");
        }
    
        @RabbitListener(queues = "so47357940")
        public void listen(String in) {
            System.out.println(in);
        }
    
        private final Map<SimpleMessageListenerContainer, AtomicBoolean> shuttingDown = new ConcurrentHashMap<>();
    
        @Bean
        public ApplicationListener<ListenerContainerConsumerFailedEvent> failures(AmqpAdmin admin,
                RabbitTemplate template) {
            return event -> {
                if (event.isFatal()) {
                    SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
                    AtomicBoolean sd = this.shuttingDown.computeIfAbsent(container, v -> new AtomicBoolean());
                    if (sd.compareAndSet(false, true)) {
                        System.out.println("RECREATING");
                        String[] qn = container.getQueueNames();
                        String q = qn[0];
                        admin.declareQueue(new Queue(q));
                        // better to use a shared exec
                        ExecutorService exec = Executors.newSingleThreadExecutor();
                        exec.execute(() -> {
                            while (container.isRunning()) {
                                // should probably give up at some point
                                try {
                                    Thread.sleep(100);
                                }
                                catch (InterruptedException e) {
                                    Thread.currentThread().interrupt();
                                }
                            }
                            container.start();
                            template.convertAndSend("so47357940", "foo");
                            this.shuttingDown.remove(container);
                        });
                    }
                    else {
                        System.out.println("RECREATING_NOT");
                    }
                }
                else {
                    System.out.println("NON_FATAL");
                }
            };
        }
    
    }
    

    Here are the debug logs I get...

    RECREATING
    2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,3)
    2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitAdmin$$Lambda$144/1094003461 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
    2017-11-17 17:38:53.893 DEBUG 42372 --- [cTaskExecutor-2] o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'so47357940'
    2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Cancelling Consumer@3a813488: tags=[Cancelling Consumer@3a813488: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
    2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.listener.BlockingQueueConsumer   : Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
    2017-11-17 17:38:53.901 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.c.CachingConnectionFactory       : Closing cached Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2)
    2017-11-17 17:38:53.903 ERROR 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
    2017-11-17 17:38:53.903 DEBUG 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Shutting down Rabbit listener container
    2017-11-17 17:38:53.903  INFO 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
    2017-11-17 17:38:53.903  INFO 42372 --- [cTaskExecutor-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
    2017-11-17 17:38:54.003 DEBUG 42372 --- [pool-4-thread-1] o.s.a.r.l.SimpleMessageListenerContainer : Starting Rabbit listener container.
    2017-11-17 17:38:54.004 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Starting consumer Consumer@3a2547b8: tags=[Starting consumer Consumer@3a2547b8: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0], channel=null, acknowledgeMode=AUTO local queue size=0
    2017-11-17 17:38:54.005 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Started on queue 'so47357940' with tag amq.ctag-3wMG_13-68ibLL05ir3ySA: Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
    2017-11-17 17:38:54.005 DEBUG 42372 --- [ool-1-thread-11] o.s.a.r.listener.BlockingQueueConsumer   : ConsumeOK : Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
    2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.a.r.c.CachingConnectionFactory       : Creating cached Rabbit Channel from AMQChannel(amqp://guest@127.0.0.1:5672/,4)
    2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback RabbitTemplate$$Lambda$146/1108520685 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473]
    2017-11-17 17:38:54.008 DEBUG 42372 --- [pool-4-thread-1] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message (Body:'foo' MessageProperties [headers=Publishing message (Body:'foo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=3, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [], routingKey = [so47357940], contentType=text/plain, contentEncoding=UTF-8, contentLength=3, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [], routingKey = [so47357940]
    2017-11-17 17:38:54.012 DEBUG 42372 --- [ool-1-thread-12] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for Consumer@3a2547b8: tags=[{amq.ctag-3wMG_13-68ibLL05ir3ySA=so47357940}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3), conn: Proxy@48c9a496 Shared Rabbit Connection: SimpleConnection@3bdd3f4a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55473], acknowledgeMode=AUTO local queue size=0
    2017-11-17 17:38:54.012 DEBUG 42372 --- [cTaskExecutor-3] o.s.a.r.listener.BlockingQueueConsumer   : Received message: (Body:'foo' MessageProperties [headers=Received message: (Body:'foo' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=so47357940, deliveryTag=1, consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, consumerQueue=so47357940]), contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=so47357940, deliveryTag=1, consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, consumerQueue=so47357940])
    2017-11-17 17:38:54.015 DEBUG 42372 --- [cTaskExecutor-3] .a.r.l.a.MessagingMessageListenerAdapter : Processing [GenericMessage [payload=foo, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=so47357940, amqp_contentEncoding=UTF-8, amqp_deliveryTag=1, amqp_consumerQueue=so47357940, amqp_redelivered=false, id=b614d9e6-1744-b600-7d86-ca9c51ad5844, amqp_consumerTag=amq.ctag-3wMG_13-68ibLL05ir3ySA, contentType=text/plain, timestamp=1510958334014}]]
    foo