javaspringspring-bootspring-integrationspring-integration-sftp

Spring Integration: Handle messages from Sftp.inboundStreamingAdapter in parallel


So,

if have a Spring Integration 5.5 / Spring Boot 2.7 flow that reads XML-files from a SFTP source, processes and stores them, then deletes. Things do work, but sequentially, meaning each file is processed after the other.

I'd like to do the processing and transforming asynchronously (not necessary in parallel, order is no no importance) to increase the througput. But I can't figure out, how to configure it.

The code:

IntegrationFlows
  .from(Sftp.inboundStreamingAdapter(sftpTemplate))
  .publishSubscribeChannel(spec -> spec
    .subscribe(
      flow -> 
        flow
         .transform( /* file content to xml */ )
         .handle( /* persist as xml */ )
     )
    .subscribe(flow -> flow.handle( /* remove file */ ))
   )
  .get()

I guess the async / parallel handling must be defined between the source and the pub/sub-channel. But how?


Solution

  • Your publishSubscribeChannel() configuration is correct and don't try to make it parallel since you cannot remove (or even close) a remote file because you deal with an InputStream in a payload after Sftp.inboundStreamingAdapter(). To make it parallel from that source perspective, you need to look into a second argument of the IntegrationFlows.from():

    static IntegrationFlowBuilder from(MessageSource<?> messageSource,
            @Nullable Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {
    

    That Consumer<SourcePollingChannelAdapterSpec> gives you an access to source polling channel adapter options. One of them is poller(Function<PollerFactory, PollerSpec> pollers) and can be configured like:

    e -> e.poller(p -> p.fixedDelay(1000).taskExecutor())
    

    That taskExecutor allows a scheduled task to be shifted to different thread. Be careful with maxMessagesPerPoll (1 by default for SourcePollingChannelAdapter): if it is more than 1, then that number of messages are going to be emitted in the same thread.

    You also can simply have a channel(c -> c.executor()) just after this from().