springspring-integrationspring-integration-amqp

Spring Integration: TaskExecutor and MaxConcurrentConsumers on AmqpInboundChannelAdapter


My Spring Integration app consumes messages from RabbitMQ transforms them to SOAP message and does web service request.

It is possible to get many (10 – 50) messages per second from the queue. Or after application restart there could be many thousand messages in RabbitMQ queue.

What is the best possible scenario to process up to 10 messages in parallel threads (message ordering is nice to have but not required feature, if web service answers with business failure then failed message should go to retry until it succeeds).

Amqp listener should not consume more messages from the queue as not busy threads available in task executor. I could define an ThreadExecutor in an channel like this:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue);
}

IntegrationFlow integrationFlow = IntegrationFlows
  .from(amqpInboundChannelAdapter)
  .channel(c -> c.executor(exportFlowsExecutor))
  .transform(businessObjectToSoapRequestTransformer)
  .handle(webServiceOutboundGatewayFactory.getObject())
  .get();

Or is it enough to define an task executor in AmqpInboundChannelAdapter like this and do not define channels task executor in the flow definition:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.taskExecutor(taskExecutor));
}

Or maybe to define task executor for a channel like option 1 but additionally set maxConcurrentConsumers on a channel adapter like this:

@Bean
public AmqpInboundChannelAdapterSMLCSpec 
amqpInboundChannelAdapter(ConnectionFactory connectionFactory, Queue queue) 
{
  return Amqp.inboundAdapter(connectionFactory, queue)
             .configureContainer(c->c.maxConcurrentConsumers(10));
}

Solution

  • The best practice to configure a concurrency on the ListenerContainer and let all downstream processes happen on those threads from the container. This way you'll get a natural back-pressure when no more messages are polled from the queue because the thread is busy. And on the other hand there is not going to be message loss just because with an ExecutorChannel after the listener container you're going to free a polling thread and the current message will be acked as consumed, but you may fail downstream.