javaspringspring-integrationspring-integration-sftp

How to trigger SFTP inbound channel in test


I have found out that an IntegrationFlow I have written using Java DSL wasn't very testable so I have followed Configuring with Java Configuration and split it into @Bean configuration. In my unit test I have used a 3rd party SFTP in memory server and I tried triggering InboundChannelAdaper and then calling receive() on the channel. I had a problem with finding out the type of Channel to use, as Channel usage was not mentioned anywhere in the SFTP Adapters documentation, but ultimately I found what I think is correct (QueueChannel) in the testing examples repository . My problem is that the unit test I wrote is hanging on the channel's receive() method. Through debugging I determined that session factory's getSession() never gets called. What am I doing wrong?

    @Bean
    public PollableChannel sftpChannel() {
        return new QueueChannel();
    }

    @Bean
    @EndpointId("sftpInboundAdapter")
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "1000"))
    public SftpInboundFileSynchronizingMessageSource sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer);
        source.setLocalDirectory(new File("/local"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(6);
        return source;
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(testSftpSessionFactory);
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setPreserveTimestamp(true);
        fileSynchronizer.setRemoteDirectory("/remote");
        List<String> filterFileNameList = List.of("1.txt");
        fileSynchronizer.setFilter(new FilenameListFilter(filterFileNameList));

        return fileSynchronizer;
    }

    @Bean
    private DefaultSftpSessionFactory testSftpSessionFactory(String username, String password, int port, String host) {
        DefaultSftpSessionFactory defaultSftpSessionFactory = new DefaultSftpSessionFactory();
        defaultSftpSessionFactory.setPassword("password");
        defaultSftpSessionFactory.setUser("username");
        defaultSftpSessionFactory.setHost("localhost");
        defaultSftpSessionFactory.setPort(777);
        defaultSftpSessionFactory.setAllowUnknownKeys(true);

        Properties config = new java.util.Properties();
        config.put( "StrictHostKeyChecking", "no" );
        defaultSftpSessionFactory.setSessionConfig(config);

        return defaultSftpSessionFactory;
    }

@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = {IntegrationFlowTestSupport.class, Synchronizer.class, Channel.class, Activator.class})
public class IntegrationFlowConfigTest {

    private static final String CONTENTS = "abcdef 1234567890";

    @Autowired
    PollableChannel sftpChannel;

    @Autowired
    DefaultSftpSessionFactory testSftpSessionFactory;

    @Autowired
    SftpInboundFileSynchronizer sftpInboundFileSynchronizer;

    @Autowired
    SftpInboundFileSynchronizingMessageSource sftpMessageSource;

    @Autowired
    SourcePollingChannelAdapter sftpInboundAdapter;

    @Test
    public void test() throws Exception {
        FileEntry f1 = new FileEntry("/remote/1.txt", CONTENTS);
        FileEntry f2 = new FileEntry("/remote/2.txt", CONTENTS);
        FileEntry f3 = new FileEntry("/remote/3.txt", CONTENTS);

        withSftpServer(server -> {
            server.setPort(777);
            server.addUser("username", "password");

            server.putFile(f1.getPath(), f1.createInputStream());
            server.putFile(f2.getPath(), f2.createInputStream());

            sftpInboundAdapter.start();
            Message<?> message = sftpChannel.receive();
        });
    }
}

Solution

  • First of all it is wrong to rewrite your code to satisfy unit test expectations. We spend not one hour thinking about dividing concerns from production code to testing. See respective documentation: https://docs.spring.io/spring-integration/docs/current/reference/html/testing.html#test-context.

    For your use-case it might be better to do a mock on that @ServiceActivator instead of QueueChannel and competing consumer in your test. What I mean that you already have a consumer in your configuration with that @ServiceActivator. So, there is no guarantee that your manual sftpChannel.receive() would give you a message from the queue since this one could be consumed by your @ServiceActivator subscriber.

    The fixedDelay = "0" looks suspicious. Isn't that too often to ask SFTP server for new files? How do you expect your system would be stable enough if you give it so much stress with such a short delay?

    We don't know what is withSftpServer(server -> {, and it is also not clear what is testSftpSessionFactory. So, not sure yet how you start an SFTP server and connect to it from your code.

    I also see sftpMessageSource.start();, but there is nowhere in your that it is stopped somehow. Plus I guess you really meant to start an endpoint, not source. The endpoint in your case is a SourcePollingChannelAdapter created for that @InboundChannelAdapter. You can use an @EndpointId, if it is not autowired automatically by type.

    In our tests we use Apache MINA SSH library: https://github.com/spring-projects/spring-integration/blob/main/spring-integration-sftp/src/test/java/org/springframework/integration/sftp/SftpTestSupport.java#L64-L76