spring-integrationspring-dsl

How to delete file after it is processed from Sftp.InboundStreamingAdapter


I am currently processing .csv files from a remote SFTP server, transforming them and placing them on a Kafka topic. I know we are able to delete the files after it is processed using an Sftp.inboundAdapter. I was curious if there was a way we can get this functionality with the Sftp.InboundStreamingAdapter? Is there another flow I would need to add in, or is there some way I can delete using the properties provided?

I can provide more code if the entire flow if needed. Thanks

    @Bean
    public IntegrationFlow sftpFileTransferFlow(SessionFactory<SftpClient.DirEntry> sftpSessionFactory,
                                                IntegrationFlowProperties properties,
                                                MessageChannel inboundFilesMessageChannel) {

        return IntegrationFlow
                .from(Sftp.inboundStreamingAdapter(new RemoteFileTemplate<>(sftpSessionFactory))
                          .filter(new SftpRegexPatternFileListFilter(properties.getRemoteFilePattern()))
                          .remoteDirectory(properties.getRemoteDirectory()),
                      e -> e.id("sftpInboundAdapter")
                            .autoStartup(true)
                            .poller(Pollers.fixedRate(5000)))
                .log(LoggingHandler.Level.DEBUG, "DataSftpToKafkaIntegrationFlow",
                     "headers['file_remoteDirectory'] + + T(java.io.File).separator  + headers['file_remoteFile']")
                .channel(inboundFilesMessageChannel)
                .get();
    }

EDIT:

    @Bean
    public IntegrationFlow readCsvFileFlow(MessageChannel inboundFilesMessageChannel,
                                           QueueChannel kafkaPojoMessageChannel) {
        
        return IntegrationFlow.from(inboundFilesMessageChannel)
                              .split(Files.splitter()
                                          .markers(true)
                                          .charset(StandardCharsets.UTF_8)
                                          .firstLineAsHeader("myHeaders")
                                          .applySequence(true))
                              // .transform(new StreamToEmailInteractionConverter()) // TODO: Figure this out so it doesn't get put on topic as .csv
                              .transform(new ObjectToJsonTransformer())
                              .log(LoggingHandler.Level.DEBUG,
                                   "DataSftpToKafkaIntegrationFlow",
                                   m -> "Payload: " + m.getPayload())
                              .channel(kafkaPojoMessageChannel)
                              .get();
    }

    @Bean
    public IntegrationFlow publishToKafkaFlow(KafkaTemplate<String, String> kafkaTemplate,
                                              MessageChannel kafkaProducerErrorRecordChannel,
                                              QueueChannel kafkaPojoMessageChannel) {

        return IntegrationFlow.from(kafkaPojoMessageChannel)
                              .log(LoggingHandler.Level.DEBUG,
                                   "DataSftpToKafkaIntegrationFlow", e -> "Payload: " + e.getPayload())
                              .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                                           .topic(KAFKA_TOPIC),
                                      e -> e.id("KafkaProducer"))
                              .routeByException(r -> r
                                      .channelMapping(KafkaProducerException.class, kafkaProducerErrorRecordChannel)
                                      .defaultOutputChannel("errorChannel"))
                              .get();
    }

After this entire flow is completed I would like to delete the file we just processed from the remote SFTP server.


Solution

  • The AbstractInboundFileSynchronizingMessageSource works in two phases: Copy remote files into local dir, and then emit messages for those local files. The deleteRemoteFiles option is for copying from remote to local phase. That's where we don't need remote files any more and we deal only with local copies downstream.

    The streaming source opens an InputStream for remote files and keeps it that way until you close it manually. That's why we don't have a explicit option to delete remote files.

    You can perform remote file removal using an SftpOutboundGateway with an rm command. This gateway could be as a second subscriber to that inboundFilesMessageChannel when you make it as a PublishSubscribeChannel and when you have closed already that IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE from message headers.

    However I think we can come up with an option like removeFileOnClose on the AbstractRemoteFileStreamingMessageSource and and perform rm when IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE is closed.