I have a StandardIntegrationFlow
which reads some files from a bunch of folders on an SFTP server, filters and then process the ones that pass the filter. The structure of my files are like this: /home/{userId}/files/...
@Bean
public StandardIntegrationFlow readingFilesAndProcess(
SessionFactory<ChannelSftp.LsEntry> sessionFactory,
PollerMetadata sftpStreamingPoller,
RecursiveSftpRemoteFileTemplate recursiveReceiptsRemoteFileTemplate,
FileeValidator fileValidator) {
return IntegrationFlows.from(
Sftp.inboundStreamingAdapter(recursiveReceiptsRemoteFileTemplate)
.remoteDirectory("/home"),
e -> e.poller(sftpStreamingPoller).autoStartup(false))
.filter(Message.class, fileValidator::isFileValid)
.handle(
message -> {
log.info("Handling message at: {}", LocalDateTime.now());
handlingFile(message);
})
.get();
}
I have created a recursive RemoteFileTemplate and I can successfully parse all the files for all the users.
@Bean
public RecursiveSftpRemoteFileTemplate recursiveReceiptsRemoteFileTemplate(
SessionFactory<ChannelSftp.LsEntry> sessionFactory,
SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter) {
final RecursiveSftpRemoteFileTemplate template =
new RecursiveSftpRemoteFileTemplate(
sessionFactory,
1,
file -> sftpPersistentAcceptOnceFileListFilter.accept(file),
List.of("files"));
template.setRemoteDirectoryExpression(new LiteralExpression("/home"));
return template;
}
To trigger the integration flow I am using this function:
@Scheduled(cron = "0 0 0 * * *", zone = "ECT")
public void triggeringFlow(){
log.info("Triggering the IntegrationFlow");
readingFilesAndProcess.start();
}
My issue is that the first time that the readingFilesAndProcess flow is triggered parses all the files, but the second, third, ... time it doesn't do anything. (checked logs).
Is there a way to "stop" the flow when it parses all the files, so the next time that the flow will be triggered it will go through all the files again?
I am using spring-integration 5.5.15
You don't need that @Scheduled
approach and you don't need to have your Inbound Channel Adapter to be stopped or restarted. There is just enough to have that poller
to be configured for a respective cron expression to be run once a day.