spring-integrationenterprise-integrationcontrol-bus

Using Control Bus EIP in Spring Integration to start/stop channels dynamically


I am interested in using Spring Integration to fetch files from various endpoints (FTP servers, email inboxes, S3, etc.) and load them into my system (essentially, ETL).

There are times when I will want these channels active and running, and other times when I will want them paused/stopped. Meaning, even if there are files available at the source, I do not want the channel consuming the data and doing anything with it.

Is a control bus an appropriate start/stop solution here:

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlow.from("controlBus")
              .controlBus()
              .get();
}

If so, how would I stop/restart a specific channel (route between an S3 bucket and the rest of my system) using the Java DSL/API? And if not, then what is the recommended practice/EIP to apply here?


Solution

  • Yes, the Control Bus is exactly a pattern and tool designed for your goal: https://www.enterpriseintegrationpatterns.com/ControlBus.html.

    Yes, to use it you need to send messages to input channel of that control bus endpoint. The payload of message to sent must be a command to do some control activity for endpoint. Typically we call start and stop.

    So, let's imagine you have an S3 source polling channel adapter:

    @Bean
    IntegrationFlow s3Flow(S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource) {
        return IntegrationFlow.from(s3InboundFileSynchronizingMessageSource, e -> e.id("myS3SourceEndpoint"))
              ...;
    }
    

    So, to stop that myS3SourceEndpoint via Control Bus, you need to send a message with a payload @myS3SourceEndpoint.stop().

    Pay attention that we don't talk here about message channels neither message sources. The active components in the flow are really endpoints.

    UPDATE

    The Control Bus component utilizes a Command Message pattern. So, you need to build a respective message and send it to the input channel of that control bus endpoint. Something like this is OK:

    @Autowired
    MessageChannel controlBus;
    
    ...
    this.controlBus.send(new GenericMessage<>("@myS3SourceEndpoint.stop()"));
    

    You can use a MessagingTemplate.convertAndSend() if you don't like creating message yourself. Or you also can expose high-lever API via @MessagingGateway interface.

    Everything you can find in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/index.html