My application reads messages from AWS SQS and processes them. Here is the configuration for the sqs adapter:
@Configuration
@Slf4j
public class ApplicationConfig {
@Bean
public MessageChannel targetChannel() {
// return new ExecutorChannel(someTaskExecutor);
return new DirectChannel();
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.defaultClient();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(50);
executor.initialize();
return executor;
}
@Bean
public SqsMessageDrivenChannelAdapter sqsMDCA(
AmazonSQSAsync amazonSQSAsync,
MessageChannel targetChannel,
ThreadPoolTaskExecutor taskExecutor) {
SqsMessageDrivenChannelAdapter adapter =
new SqsMessageDrivenChannelAdapter(amazonSQSAsync, "sqs-queue-name");
adapter.setOutputChannel(targetChannel);
adapter.setMaxNumberOfMessages(10);
adapter.setTaskExecutor(taskExecutor);
return adapter;
}
}
Is DirectChannel a bottleneck in such situation?
You already use a TaskExecutor
for the SqsMessageDrivenChannelAdapter
, so all the processing is async. Therefore when you use a DirectChannel
downstream, the bottle would be there only if message processing is too long. However you need to take into account if that really what you want: shift the work to another thread where you might just lose a message in case of failure. When thread processing a message returns its control back to the SQS client, that message is going to be ack'ed. So, if failure happens downstream, that message is going to be lost. It is not always good to have a persistent queue exhausted for performance, but then you data is lost because of shifting to a different thread in your application.
The DirectChannel
cannot be a bottle neck - the handler subscribed to that one could be. But that's already outside of DirectChannel
logic. It just calls your subscriber directly in the thread sending message to this channel. Technically same as you would call another Java method from your current one.