Spring-Rabbitmq SimpleRabbitListenerContainerFactory is defined with a CustomThreadPoolExecutor.
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(final ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter());
factory.setAfterReceivePostProcessors(new RabbitMQConsumerInterceptor());
factory.setBeforeSendReplyPostProcessors(new RabbitMQProducerInterceptor());
factory.setTaskExecutor(
new CustomThreadPoolExecutor(
20, 30, 60, SECONDS, new LinkedBlockingQueue<>()));
return factory;
}
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println(":afterExecute"); // Thi method never executed
}
}
For some strange reason, afterExecute method is not invoked. Can someone please shed some light?
It is invoked when you stop the listener container.
This executor is used in the SimpleMessageListenerContainer
like this:
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);
That processor
has this:
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
So, essentially, we never leave that loop in good circumstances.