javaspringspring-integrationspring-xd

How to send messages on a "side channel" in Spring XD?


I'd like to achieve something like this:

[Source] --> [Splitter] --B--> [Processor] --B-->  [Aggregator] ---> [Sink]
              |                                        ^
              |                                        |
              |--------------------C-------------------|

After several processors which include business logic, a special producer should emit several messages of type B along with a control message C.

Processors with further business logic will work on those messages.

An Aggregator should "collect" all the results, along with the control message.

The control message would contain basically the number of generates Messages (B), so the Aggregator can have the knowledge whether

  1. all expected Messages of B have arrived
  2. a timeout has occurred

I know, that the Aggregator can deduce the number of messages to expect from the splitter by using the message header, but will it also timeout if no message ever reaches it?

Generally speaking: is it possible to have Transformers with more than one input or output channel?


Solution

  • 2 first -

    The standard aggregator module has a timeout parameter (defaults to 50 seconds).

    It is implemented with a message group store reaper that runs every timeout seconds - so the actual timeout will be up to 2 x timeout with an average of 1.5 x timeout.

    The aggregator now has a group-timeout property which is more accurate than using a reapear; it would need a custom aggregator processor to use that. It also has a group-timeout-expression so the timeout can vary based on runtime conditions (e.g. current group size).

    now 1 -

    There is no standard mechanism to send side-band data from 1 module to another. Generally, modules communicate with each other by setting message headers. That's how the standard splitter sends information to the aggregator (sequenceNumber and sequenceSize headers).

    You can create a custom aggregator processor with a custom ReleaseStrategy to use other headers.

    Generally speaking: is it possible to have Transformers with more than one input or output channel?

    Not with XD but the follow-on project (spring-cloud-stream) supports binding multiple inputs/outputs to apps.