javaspring-integrationsftp

Spring Integration how to dynamically set local directory/remote directory based on message to either sent to a remote sftp or get files from it


I'm currently trying to made changes to a project that uses Spring integration to send and receive Files from/to a SFTP server.

The changes consist of adding other SFTP servers, choose the right server with conditions and handle files the same way.

I'm currently struggling with two things.

The first :

I have a channel which messages contain a header with the remote directory, and I need to access the right SFTP session. But to make the session, I need the right properties. (either config1 or config2, which are defined in my application.yml) I'm not sure how I can pass this information to my ServiceActivator. (Second TODO in my code)

The second :

I need to retrieve files from multiple SFTP servers, and I need to save those files in multiple local directories. The path between remote and local are not the same and are defined in the properties config1 and config2 the same way as I described in my first problem. I think I'm in the right way for delegating the SFTP session, but I don't know how I can set the localDirectory based on the SFTP session. (First TODO in my code)

If someone could help me a little, I'll appreciate it a lot.

Thanks in advance.

Here is my code so far :

    SftpConfig.Sftp1 sftpConfig1;
    SftpConfig.Sftp2 sftpConfig2;

    @Bean
    @BridgeTo
    public MessageChannel uploadChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public ExpressionParser spelExpressionParser() {
        return new SpelExpressionParser();
    }

    public SessionFactory<ChannelSftp.LsEntry> getSftpSession(SftpConfig sftp) {

        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(sftp.getHost());
        factory.setPort(sftp.getPort());
        factory.setUser(sftp.getUser());
        factory.setPassword(sftp.getPassword());
        factory.setAllowUnknownKeys(true);
        factory.setTimeout(sftp.getTimeout());
        log.info("timeout is set to: {}", sftp.getTimeout());

        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSf() {

        Map<Object, SessionFactory<ChannelSftp.LsEntry>> mapSession = new HashMap<>();
        mapSession.put("config1", getSftpSession(sftpConfig1));
        mapSession.put("config2", getSftpSession(sftpConfig2));

        SessionFactoryLocator<ChannelSftp.LsEntry> sessionFactoryLocator = new DefaultSessionFactoryLocator<>(mapSession);

        return new DelegatingSessionFactory<>(sessionFactoryLocator);
    }

    @Bean
    public RotatingServerAdvice advice() {

        List<RotationPolicy.KeyDirectory> keyDirectories = sftpConfig1.getCodes().stream()
                .map(code -> new RotationPolicy.KeyDirectory("config1", sftpConfig1.getReaderDirectory() + SEPARATOR + code))
                .collect(Collectors.toList());

        keyDirectories.addAll(sftpConfig2.getCodes().stream()
                .map(code -> new RotationPolicy.KeyDirectory("config2", sftpConfig2.getReaderDirectory() + SEPARATOR + code))
                .collect(Collectors.toList()));

        return new RotatingServerAdvice(delegatingSf(), keyDirectories);
    }

    @Bean
    public IntegrationFlow sftpIntegrationFlow() {
        return IntegrationFlows.from(
                        Sftp.inboundAdapter(delegatingSf())
                                .filter(new SftpSimplePatternFileListFilter("*.csv"))
                                .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                                .localFilter(new AbstractFileListFilter<File>() {
                                    @Override
                                    public boolean accept(final File file) {
                                        return file.getName().endsWith(".csv");
                                    }
                                })
                                .deleteRemoteFiles(false)
                                .temporaryFileSuffix(".new")
                                .localDirectory(new File()) // TODO dynamic local directory based on sftp session
                                .remoteDirectory("."),
                                    e -> e.poller(Pollers.fixedDelay(1, MINUTES).advice(advice()).advice(logNoFileFoundAdvice())))
                .log(LoggingHandler.Level.INFO, "[SFTP]", m -> "Received file: " + m.getHeaders().get(FileHeaders.FILENAME))
                .channel("filesReceptionChannel")
                .enrichHeaders(h -> h.header("errorChannel", "errorChannel"))
                .get();
    }

    @Bean
    public MethodInterceptor logNoFileFoundAdvice() {
        return invocation -> {
            Object result = invocation.proceed();
            if (result == null) {
                log.info("[SFTP] No files found");
            }
            return result;
        };
    }

    @Bean
    public SftpRemoteFileTemplate sftpTemplate() {
        return new SftpRemoteFileTemplate(sftpSession());
    }

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSession() {
        return getSftpSession(); // TODO dynamic sftp session based on message received in serviceActivator bellow
    }

    @Bean
    @ServiceActivator(inputChannel = "uploadChannel")
    public MessageHandler uploadHandler() {
        return getFtpMessageHandler(sftpSession());
    }

    public MessageHandler getFtpMessageHandler(SessionFactory<ChannelSftp.LsEntry> sftpSession) {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSession);
        handler.setRemoteDirectoryExpressionString("headers['remoteDirectory']");
        handler.setFileNameGenerator(message -> {
            if (message.getPayload() instanceof File) {
                return ((File) message.getPayload()).getName();
            } else {
                throw new IllegalArgumentException("File expected as payload.");
            }
        });
        handler.setUseTemporaryFileName(false);
        return handler;
    }

EDIT :

I succeeded to fix the first part with your help, so thanks a lot.

To fix it, I firstly added a default factory in the delegatingSf() method.

Then, I used the delegatingSf() in the serviceActivator instead of sftpSession() which I removed.

Finally, in the method that send the message to my uploadChannel, I used delegatingSf.setThreadKey(mapSessionKey) to choose the right factory dynamically in the method that output the message to my uploadChannel. And of course delegatingSf.clearThreadKey() just after the sending.

I'll try to make the second part functional, and I'll post the fix code after.

EDIT 2 :

I used the localFileNameExpression as suggested and recover the localDirectory dynamically, but I'm not happy about it as it's not really a good way to do it in my opinion. I used the remoteDirectory to identify the SFTP server, but if two servers one day have the same path, it will not be functional. But it works for now, so thank you for your help.

SftpConfig.Sftp1 sftpConfig1;
SftpConfig.Sftp2 sftpConfig2;

@Bean
@BridgeTo
public MessageChannel uploadChannel() {
    return new PublishSubscribeChannel();
}

@Bean
public ExpressionParser spelExpressionParser() {
    return new SpelExpressionParser();
}

public SessionFactory<ChannelSftp.LsEntry> getSftpSession(SftpConfig sftp) {

    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(sftp.getHost());
    factory.setPort(sftp.getPort());
    factory.setUser(sftp.getUser());
    factory.setPassword(sftp.getPassword());
    factory.setAllowUnknownKeys(true);
    factory.setTimeout(sftp.getTimeout());
    log.info("timeout is set to: {}", sftp.getTimeout());

    return new CachingSessionFactory<>(factory);
}

@Bean
public DelegatingSessionFactory<ChannelSftp.LsEntry> delegatingSf() {

    Map<Object, SessionFactory<ChannelSftp.LsEntry>> mapSession = new HashMap<>();
    mapSession.put("config1", getSftpSession(sftpConfig1));
    mapSession.put("config2", getSftpSession(sftpConfig2));

    SessionFactoryLocator<ChannelSftp.LsEntry> sessionFactoryLocator = new DefaultSessionFactoryLocator<>(mapSession, mapSession.get("config1"));

    return new DelegatingSessionFactory<>(sessionFactoryLocator);
}

@Bean
public RotatingServerAdvice advice() {

    List<RotationPolicy.KeyDirectory> keyDirectories = sftpConfig1.getCodes().stream()
            .map(code -> new RotationPolicy.KeyDirectory("config1", sftpConfig1.getReaderDirectory() + SEPARATOR + code))
            .collect(Collectors.toList());

    keyDirectories.addAll(sftpConfig2.getCodes().stream()
            .map(code -> new RotationPolicy.KeyDirectory("config2", sftpConfig2.getReaderDirectory() + SEPARATOR + code))
            .collect(Collectors.toList()));

    return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

@Bean
public IntegrationFlow sftpIntegrationFlow() {
    return IntegrationFlows.from(
                    Sftp.inboundAdapter(delegatingSf())
                            .filter(new SftpSimplePatternFileListFilter("*.csv"))
                            .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                            .localFilter(new AbstractFileListFilter<File>() {
                                @Override
                                public boolean accept(final File file) {
                                    return file.getName().endsWith(".csv");
                                }
                            })
                            .deleteRemoteFiles(false)
                            .temporaryFileSuffix(".new")
                            .localFilenameExpression("@sftpEIPConfig.getLocalDirectoryReader(#remoteDirectory) + #this")
                            .localDirectory(new File("/"))
                            .remoteDirectory("."),
                                e -> e.poller(Pollers.fixedDelay(1, MINUTES).advice(advice()).advice(logNoFileFoundAdvice())))
            .log(LoggingHandler.Level.INFO, "[SFTP]", m -> "Received file: " + m.getHeaders().get(FileHeaders.FILENAME))
            .channel("filesReceptionChannel")
            .enrichHeaders(h -> h.header("errorChannel", "errorChannel"))
            .get();
}

@Bean
public MethodInterceptor logNoFileFoundAdvice() {
    return invocation -> {
        Object result = invocation.proceed();
        if (result == null) {
            log.info("[SFTP] No files found");
        }
        return result;
    };
}

@Bean
public SftpRemoteFileTemplate sftpTemplate() {
    return new SftpRemoteFileTemplate(delegatingSf());
}

@Bean
@ServiceActivator(inputChannel = "uploadChannel")
public MessageHandler uploadHandler() {
    return getFtpMessageHandler(delegatingSf());
}

public MessageHandler getFtpMessageHandler(SessionFactory<ChannelSftp.LsEntry> sftpSession) {
    SftpMessageHandler handler = new SftpMessageHandler(sftpSession);
    handler.setRemoteDirectoryExpressionString("headers['remoteDirectory']");
    handler.setFileNameGenerator(message -> {
        if (message.getPayload() instanceof File) {
            return ((File) message.getPayload()).getName();
        } else {
            throw new IllegalArgumentException("File expected as payload.");
        }
    });
    handler.setUseTemporaryFileName(false);
    return handler;
}

Solution

  • The localDirectory cannot be changed on the AbstractInboundFileSynchronizingMessageSource. You may consider to use a localFilenameExpression() to build the directory dynamically. While localDirectory might b just root - /.

    For sending to different SFTP servers you definitely have to use DelegatingSessionFactory. See more info in docs: https://docs.spring.io/spring-integration/reference/sftp/dsf.html. And also: https://docs.spring.io/spring-integration/reference/handler-advice/context-holder.html