spring-integrationspring-integration-aws

Spring Integration aws Kinesis , message aggregator, Release Strategy


this is a follow-up question to Spring Integration AWS RabbitMQ Kinesis

I have the following configuration. I am noticing that when I send a message to the input channel named kinesisSendChannel for the first time, the aggregator and release strategy is getting invoked and messages are sent to Kinesis Streams. I put debug breakpoints at different places and could verify this behavior. But when I again publish messages to the same input channel the release strategy and the outbound processor are not getting invoked and messages are not sent to the Kinesis. I am not sure why the aggregator flow is getting invoked only the first time and not for subsequent messages. For testing purpose , the TimeoutCountSequenceSizeReleaseStrategy is set with count as 1 & time as 60 seconds. There is no specific MessageStore used. Could you help identify the issue?

@Bean(name = "kinesisSendChannel")
public MessageChannel kinesisSendChannel() {
    return MessageChannels.direct().get();
}

@Bean(name = "resultChannel")
public MessageChannel resultChannel() {
    return MessageChannels.direct().get();
}

@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler aggregator(TestMessageProcessor messageProcessor,
        MessageChannel resultChannel,
        TimeoutCountSequenceSizeReleaseStrategy timeoutCountSequenceSizeReleaseStrategy) {
    AggregatingMessageHandler handler = new AggregatingMessageHandler(messageProcessor);
    handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("headers['foo']"));
    handler.setReleaseStrategy(timeoutCountSequenceSizeReleaseStrategy);
    handler.setOutputProcessor(messageProcessor);
    handler.setOutputChannel(resultChannel);
    return handler;

}

@Bean
@ServiceActivator(inputChannel = "resultChannel")
public MessageHandler kinesisMessageHandler1(@Qualifier("successChannel") MessageChannel successChannel,
        @Qualifier("errorChannel") MessageChannel errorChannel, final AmazonKinesisAsync amazonKinesis) {
    KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
    kinesisMessageHandler.setSync(true);
    kinesisMessageHandler.setOutputChannel(successChannel);
    kinesisMessageHandler.setFailureChannel(errorChannel);

    return kinesisMessageHandler;
}



public class TestMessageProcessor extends AbstractAggregatingMessageGroupProcessor {

    @Override
    protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
        final PutRecordsRequest putRecordsRequest = new PutRecordsRequest().withStreamName("test-stream");

        final List<PutRecordsRequestEntry> putRecordsRequestEntry = group.getMessages().stream()
                .map(message -> (PutRecordsRequestEntry) message.getPayload()).collect(Collectors.toList());

        putRecordsRequest.withRecords(putRecordsRequestEntry);
        return putRecordsRequestEntry;

    }

}

Solution

  • I believe the problem is here handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("headers['foo']"));. All your messages come with the same foo header. So, all of them form the same message group. As long as you release group and don’t remove it, all the new messages are going to be discarded.

    Please, revise aggregator documentation to make yourself familiar with all the possible behavior : https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator