javaspringspring-bootrabbitmqspring-amqp

Asynchronous publish ignoring errors and timeouts


I have a Spring Boot application which sends via RabbitMq stats of response for each query of the Controller. It works when the RabbitMq server is reachable and accepts the message.

However, this publish should be optional and the client should not be impacted if for any reason the RabbitMq is not reachable or fails to acknowledge the message. I would like to have a silent failure without impacting the REST API response time and latency (only logging the errors). What I observe is unfortunately a blocking call in AsyncRabbitTemplate::convertSendAndReceive in the case RabbitMq server connection timeouts.

We use Java 21 so I've ended up as a workaround using an executor with Virtual Threads which unblocks the publishing, like this:

@Component
public class RabbitMQProducer {
  private static final Logger LOG = LoggerFactory.getLogger(RabbitMQProducer.class);
  private static final RateLimitedLog RATE_LIMITED_LOG = RateLimitedLog.withRateLimit(LOG).maxRate(10).every(Duration.ofSeconds(15)).build();

  private final AsyncRabbitTemplate rabbitTemplate;

  private final Executor executor = Executors.newVirtualThreadPerTaskExecutor();

  public RabbitMQProducer(AsyncRabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
  }

  public void sendMessage(String exchangeName, String routingKey, String message) {
    executor.execute(() -> send(exchangeName, routingKey, message));
  }

  private void send(String exchangeName, String routingKey, String message) {
    final RabbitConverterFuture<String> future =
        rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message);
    future.whenComplete((result, ex) -> {
      if (ex != null) {
        RATE_LIMITED_LOG.error(message, ex);
      }
    });
  }
}

It seems to work fine in case RabbitMq is not responding (we deploy our application in Kubernetes and to test that I just disabled the network connection by adding NetworkPolicies forbidding the connection).

So the question is: why is the executor with Virtual Thread needed ? If I remove it:

  public void sendMessage(String exchangeName, String routingKey, String message) {
        final RabbitConverterFuture<String> future =
        rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message);
    future.whenComplete((result, ex) -> {
      if (ex != null) {
        RATE_LIMITED_LOG.error(message, ex);
      }
    });
  }
}

the method is actually blocking and the end users are impacted.

Here are the RabbitMq configuration Beans:

@Configuration
public class RabbitMQConfig {
  @Bean
  public ConnectionFactory connectionFactory(RabbitMqConfiguration config) {
    final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();

    cachingConnectionFactory.setHost(config.getAmqpHost());
    cachingConnectionFactory.setPort(config.getAmqpPort());
    cachingConnectionFactory.setUsername(config.getAmqpUser());
    cachingConnectionFactory.setPassword(config.getAmqpPassword());

    cachingConnectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);

    return cachingConnectionFactory;
  }

  @Bean
  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

    rabbitTemplate.setRetryTemplate(RetryTemplate.builder().maxAttempts(1).build());

    return rabbitTemplate;
  }

  @Bean
  public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) {
    return new AsyncRabbitTemplate(rabbitTemplate);
  }

  @Bean
  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
      SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setConsumerTagStrategy(q -> CONSUMER_TAG);
    return factory;
  }

  @Bean(name = "searchStreamExchange")
  public Exchange searchStreamExchange(RabbitMqConfiguration config) {
    return new TopicExchange(config.getSearchStreamExchange(), false, false);
  }
}


Solution

  • Yes, the logic there is like this:

        if (this.container != null) {
            this.template.convertAndSend(exchange, routingKey, object, this.messagePostProcessor, correlationData);
        }
    

    before we return that RabbitConverterFuture from the convertSendAndReceive().

    So, it is indeed blocking until it is able to get a connection to RabbitMQ and initiate a publish command.

    We may look into a fully async approach in the next major version for Spring AMQP.