kotlinspring-integration

How to detect end of IntegrationFlow processing after split and filter?


In Spring Integration, when creating IntegrationFlow, when event initiates a flow I have multiple operations, including splitter and filter. I would like to detect when all of the messages splitted has been processed.

IntegrationFlow
            .from(flux)
[... other operators ...]
            .split()
[... other operators ...]
            .filter<MailVertex?> ({ it != null }) { it.discardChannel(NullChannel()) }
[... other operators ...]
            .get()

Single event that triggers entire integration flow is a collection (let's assume, that it has 3 elements). Then, colletion is split, therefore 3 messages are flowing. After processing them, we perform filtering, and only two (out of three) are not null. Therefore, only two messages reaches end of the flow.

Flow can be processed in parallel, so we cannot assume anything about an order of messages.

How to run an action exactly once per initial event (collection of 3 elements) after processing all two unfiltered messages?

Normally, we can use splitter and then aggregator, which - based on correlation number, sequence number and sequence size will know if all messages were already processed or not. However, in my case, after splitting I have a filter. Therefore, not all messages that splitter produced will ever reach end of the stream.


Solution

  • You already use a discardChannel and it is totally normal to have that channel as an input for the final aggregator. It is also totally OK to have a channel() between endpoint in your flow definition. So, let's imaging in the end of your flow you do:

    .channel("aggregateInput")
    .aggregate()
    .get();
    

    so, it is OK to discard messages from the filter into that channel. Therefore:

    .filter<MailVertex?> ({ it != null }) { it.discardChannel("aggregateInput") }
    

    This way filtered message are just going directly to the aggregator bypassing the rest of the flow for good messages.

    You may do some CountDownLatch in header to wait for on the producer side, but I believe an aggregator is more natural messaging way.