I am interested in using Spring Integration to fetch files from various endpoints (FTP servers, email inboxes, S3, etc.) and load them into my system (essentially, ETL).
There are times when I will want these channels active and running, and other times when I will want them paused/stopped. Meaning, even if there are files available at the source, I do not want the channel consuming the data and doing anything with it.
Is a control bus an appropriate start/stop solution here:
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from("controlBus")
.controlBus()
.get();
}
If so, how would I stop/restart a specific channel (route between an S3 bucket and the rest of my system) using the Java DSL/API? And if not, then what is the recommended practice/EIP to apply here?
Yes, the Control Bus is exactly a pattern and tool designed for your goal: https://www.enterpriseintegrationpatterns.com/ControlBus.html.
Yes, to use it you need to send messages to input channel of that control bus endpoint. The payload of message to sent must be a command to do some control activity for endpoint. Typically we call start and stop.
So, let's imagine you have an S3 source polling channel adapter:
@Bean
IntegrationFlow s3Flow(S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource) {
return IntegrationFlow.from(s3InboundFileSynchronizingMessageSource, e -> e.id("myS3SourceEndpoint"))
...;
}
So, to stop that myS3SourceEndpoint
via Control Bus, you need to send a message with a payload @myS3SourceEndpoint.stop()
.
Pay attention that we don't talk here about message channels neither message sources. The active components in the flow are really endpoints.
UPDATE
The Control Bus component utilizes a Command Message pattern. So, you need to build a respective message and send it to the input channel of that control bus endpoint. Something like this is OK:
@Autowired
MessageChannel controlBus;
...
this.controlBus.send(new GenericMessage<>("@myS3SourceEndpoint.stop()"));
You can use a MessagingTemplate.convertAndSend()
if you don't like creating message yourself. Or you also can expose high-lever API via @MessagingGateway
interface.
Everything you can find in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/index.html