So I have two microservices communicating through RabbitMQ, one of them sends RPC requests (using RabbitTemplate#sendAndReceive
) to another. And I noticed that the app is not able to recover if the broker have been down for a few minutes.
Here's how I configure the connection factory and template:
@Bean
public AbstractConnectionFactory connectionFactory() {
return new CachingConnectionFactory(createRabbitConnectionFactory(properties));
}
private static com.rabbitmq.client.ConnectionFactory createRabbitConnectionFactory(RabbitProperties properties) {
final com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setHandshakeTimeout(60000);
connectionFactory.setHost(properties.getHost());
connectionFactory.setPort(properties.getPort());
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getPassword());
connectionFactory.setVirtualHost(properties.getVirtualhost());
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(
AbstractConnectionFactory connectionFactory
) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(10000);
rabbitTemplate.setUserCorrelationId(true);
return rabbitTemplate;
}
And here's how I use RabbitTemplate:
final Message responseMessage = rabbitTemplate.sendAndReceive(
routingKey,
new Message(serializationService.toBytes(request),
MessagePropertiesBuilder
.newInstance()
.setContentType("my-content")
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.setCorrelationId(correlationData.getId())
.build()
),
correlationData
);
Then what I do is I put load on API method that triggers this RPC, then I turn off the broker and wait for a few minutes. Right after broker start I see a lot of logs like this:
SimpleConsumer [queue=amq.rabbitmq.reply-to, index=1091, consumerTag=amq.ctag-1DtEyYlNIMSNowMMsoPYNQ identity=57919ab] started
And then all new requests lead to an error:
org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later.
And in RabbitMQ interface I see that my connection has created 2047 channels, all of them are in idle state. Only app restart helps to fix the problem.
What do you think is happening here and how can I make the client automatically recover?
I use spring-boot-starter-amqp
of version 2.7.5 (so spring-messaging
5.3.2 and spring-rabbit
2.4.7)
I've tried configuring channelCheckoutTimeout
:
cachingConnectionFactory.setChannelCheckoutTimeout(10000);
cachingConnectionFactory.setChannelCacheSize(200);
But I just get another error and same situation:
org.springframework.amqp.AmqpTimeoutException: No available channels
Found what is the problem. Your application properties must have these:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
cache:
channel:
checkout-timeout: 30s
Pay attention to that checkout-timeout: 30s
.
This configures a limit on the opened channels from the CachingConnectionFactory
:
/**
* Sets the channel checkout timeout. When greater than 0, enables channel limiting
* in that the {@link #channelCacheSize} becomes the total number of available channels per
* connection rather than a simple cache size. Note that changing the {@link #channelCacheSize}
* does not affect the limit on existing connection(s), invoke {@link #destroy()} to cause a
* new connection to be created with the new limit.
* <p>
* Since 1.5.5, also applies to getting a connection when the cache mode is CONNECTION.
* @param channelCheckoutTimeout the timeout in milliseconds; default 0 (channel limiting not enabled).
* @since 1.4.2
* @see #setConnectionLimit(int)
*/
public void setChannelCheckoutTimeout(long channelCheckoutTimeout) {
More info in docs: https://docs.spring.io/spring-amqp/reference/amqp/connections.html#cachingconnectionfactory
Starting with version 1.4.2, the
CachingConnectionFactory
has a property calledchannelCheckoutTimeout
. When this property is greater than zero, thechannelCacheSize
becomes a limit on the number of channels that can be created on a connection. If the limit is reached, calling threads block until a channel is available or this timeout is reached, in which case aAmqpTimeoutException
is thrown.
You also can adjust that cache to what fits your expectations:
spring:
rabbitmq:
cache:
channel:
checkout-timeout: 30s
size: 100
Note: host/port
and credentials are just like that by default. So, no need in them in the application.yml
.