spring-integrationspring-integration-dslspring-integration-file

Testing Spring Integration Flow


Actually I am creating a POC for running Spring integration With Kubernetes, and for that I created an integration flow that reads an XML file and move it to Processed Dir if it's a valid xml file otherwise move it to Error Dir

package com.stackoverflow.questions.config;


import static java.util.Arrays.asList;

import com.stackoverflow.questions.dto.WriteResult;
import com.stackoverflow.questions.handler.FileReaderHandler;
import com.stackoverflow.questions.handler.StudentErrorHandler;
import com.stackoverflow.questions.handler.StudentWriterHandler;
import com.stackoverflow.questions.service.DirectoryManagerService;
import com.stackoverflow.questions.transformer.FileToStudentTransformer;
import java.io.File;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.DirectoryScanner;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.RecursiveDirectoryScanner;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.CompositeFileListFilter;
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.MessageChannel;

@Configuration
@EnableIntegration
@RequiredArgsConstructor
public class MainIntegrationFlow {

  @Value("${regex.filename.pattern}")
  private String regexFileNamePattern;

  @Value("${root.file.dir}")
  private String rootFileDir;

  @Value("${default.polling.rate}")
  private Long defaultPollingRate;

  private final DirectoryManagerService directoryManagerService;
  private final StudentErrorHandler studentErrorHandler;
  private final FileReaderHandler fileReaderHandler;
  private final StudentWriterHandler studentWriterHandler;
  private final FileToStudentTransformer fileToStudentTransformer;

  @Bean("mainStudentIntegrationFlow")
  public IntegrationFlow mainStudentIntegrationFlow(
      @Qualifier("mainFileReadingSourceMessage") MessageSource<File> mainFileReadingSourceMessage,
      @Qualifier("fileReaderChannel") MessageChannel fileReaderChannel) {
    return IntegrationFlows.from(mainFileReadingSourceMessage)
        .channel(fileReaderChannel)
        .handle(fileReaderHandler)
        .transform(fileToStudentTransformer)
        .handle(studentWriterHandler)
        .<WriteResult, Boolean>route(WriteResult::isWriten,
            mapping -> mapping
                .subFlowMapping(true, moveToProcessedDirFlow())
                .subFlowMapping(false, moveToErrorDirFlow()))
        .get();
  }


  public IntegrationFlow moveToProcessedDirFlow() {
    return flow -> flow.handle(message ->
        directoryManagerService
            .moveToProcessedDir(((WriteResult) message.getPayload()).getFilename()));
  }

  public IntegrationFlow moveToErrorDirFlow() {
    return flow -> flow.channel("studentErrorChannel")
        .handle(message ->
            directoryManagerService
                .moveToErrorDir(((WriteResult) message.getPayload()).getFilename()));
  }

  @Bean(name = "errorHandlerMainFlow")
  public IntegrationFlow errorHandlerMainFlow() {
    return IntegrationFlows.from("errorChannel")
        .handle(studentErrorHandler)
        .get();
  }

  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata mainPollerMetadata() {
    return Pollers.fixedRate(defaultPollingRate, TimeUnit.SECONDS)
        .maxMessagesPerPoll(0)
        .get();
  }

  @Bean(name = "fileReaderChannel")
  public MessageChannel fileReaderChannel() {
    return MessageChannels.queue("fileReaderChannel").get();
  }

  @Bean("mainDirectoryScanner")
  public DirectoryScanner mainDirectoryScanner() {
    DirectoryScanner recursiveDirectoryScanner = new RecursiveDirectoryScanner();

    CompositeFileListFilter<File> compositeFileListFilter = new CompositeFileListFilter<>(
        asList(new AcceptOnceFileListFilter<>(),
            new RegexPatternFileListFilter(regexFileNamePattern)));

    recursiveDirectoryScanner.setFilter(compositeFileListFilter);
    return recursiveDirectoryScanner;
  }

  @Bean("mainFileReadingSourceMessage")
  public MessageSource<File> mainFileReadingSourceMessage(
      @Qualifier("mainDirectoryScanner") DirectoryScanner mainDirectoryScanner) {
    FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource();
    fileReadingMessageSource.setDirectory(new File(rootFileDir));
    fileReadingMessageSource.setScanner(mainDirectoryScanner);

    return fileReadingMessageSource;
  }
}

I am trying to test the whole flow, and to do so I created a test class:

@SpringBootTest
@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")
public class MainFlowIntegratoinTests {

  @Autowired
  private MockIntegrationContext mockIntegrationContext;

  @Autowired
  private SourcePollingChannelAdapter fileReadingEndpoint;

  @Test
  public void readingInvalidFileAndMoveItToErrorDir() throws IOException {

    File file = new ClassPathResource("valid01-student-01.xml").getFile();
    MessageSource<File> mockInvalidStudentFile = () -> MessageBuilder.withPayload(file).build();

    mockIntegrationContext.substituteMessageSourceFor("fileReadingEndpoint", mockInvalidStudentFile);

    // start the file adapter manually
    fileReadingEndpoint.start();
  }

}

in which I am testing my integration flow, but somehow the test does not reach the writer endpoint, i can see the logs from the reader and transformer endpoints but not from the writer.

I tried to read the documentation - https://docs.spring.io/spring-integration/reference/html/testing.html - but I cannot figure it out.

would you please give us a sample or more details on how to test the whole integration flow.

Working Tests:

package com.stackoverflow.questions;

import static org.apache.commons.io.FileUtils.forceDelete;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD;

import com.stackoverflow.questions.service.DirectoryManagerService;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.util.ReflectionTestUtils;

@SpringBootTest
@DirtiesContext(classMode = BEFORE_EACH_TEST_METHOD)
public class MainFlowIntegrationTests {


  private static final String MOCK_FILE_DIR = "intFiles/";
  private static final String VALID_XML_MOCK_FILE = "valid01-student-01.xml";
  private static final String INVALID_XML_MOCK_FILE = "invalid02-student-02.xml";

  @Autowired
  private MessageChannel fileReaderChannel;

  @Autowired
  private DirectoryManagerService directoryManagerService;

  private File queueDir;
  private File processed;
  private File error;

  @BeforeEach
  public void setup() throws IOException {
    createRequiredDirectories();
    moveFilesToQueueDir();
    injectProperties();
  }

  @AfterEach
  public void tearDown() throws IOException {
    deleteRequiredDirectories();
  }

  @Test
  public void readingFileAndMoveItToProcessedDir() throws IOException, InterruptedException {
    // When: the fileReaderChannel receives a valid XML file
    fileReaderChannel
        .send(MessageBuilder.withPayload(new File(queueDir, VALID_XML_MOCK_FILE)).build());

    // Then: the valid XML file should be sent to the processedDir
    await().until(() -> processed.list().length == 1);
  }

  @Test
  public void readingInvalidFileAndMoveItToErrorDir() throws IOException, InterruptedException {
    // When: the fileReaderChannel receives a invalid XML file
    fileReaderChannel
        .send(MessageBuilder.withPayload(new File(queueDir, INVALID_XML_MOCK_FILE)).build());

    // Then: the invalid XML file should be sent to the errorDir
    await().until(() -> error.list().length == 1);
  }

  private void injectProperties() {
    ReflectionTestUtils
        .setField(directoryManagerService, "errorDir", error.getAbsolutePath().concat("/"));
    ReflectionTestUtils
        .setField(directoryManagerService, "processedDir", processed.getAbsolutePath().concat("/"));
  }

  private void moveFilesToQueueDir() throws IOException {
    File intFiles = new ClassPathResource(MOCK_FILE_DIR).getFile();

    for (String filename : intFiles.list()) {
      FileUtils.copyFile(new File(intFiles, filename), new File(queueDir, filename));
    }
  }

  private void createRequiredDirectories() throws IOException {
    queueDir = Files.createTempDirectory("queueDir").toFile();
    processed = Files.createTempDirectory("processedDir").toFile();
    error = Files.createTempDirectory("errorDir").toFile();
  }

  private void deleteRequiredDirectories() throws IOException {
    forceDelete(queueDir);
    forceDelete(processed);
    forceDelete(error);
  }

}

Solution

  • Doesn't look like you test or verify anything in your readingInvalidFileAndMoveItToErrorDir() at all. The fileReadingEndpoint.start(); is the last line of the test method.

    Please, consider to investigate what is JUnit and how we should write test methods and what we should do to really verify async solutions like integration flow similar to yours: https://junit.org/junit5/docs/current/user-guide/

    What I see so far is not good. The message source is about polling with some period. Your mockInvalidStudentFile is about producing the same file all the time. Is it really expected in your solution? You probably can just consider to send your file direct to the channel the message source is configured for.

    You don't need a fileReadingEndpoint.start(); since substituteMessageSourceFor() does autostart up by default.

    Your message after sending to the channel with the file as a payload is going to travel through the flow. You probably should see how you would verify the correctness of your logic in the test. Since you say that you drop the file in the end of the flow in some dir, so probably you should check that dir after sending. Maybe even using some loop or Awaitility: https://github.com/awaitility/awaitility.

    Another way is to use a substituteMessageHandlerFor() so you would place a MockMessageHandler instead of your file writer to verify the file. Note: this is going to work if you send an original message directly to the channel, not as a mock MessageSource origin and if you don't have thread shifting in your flow. So, all the flow is going to be performed in the same thread as your test method. If you have some async hand off in the flow, your MockMessageHandler should do some CountDonwLatch to make the test method to be blocked.

    Saying all of these I mean that there are a lot of nuances with testing understanding which comes with the practice and experience. It is probably not possible to do some sample for your which could be useful for other people since there solution might be different and would require other testing approach.

    Therefore my advice from here: do your best you know how you could test your own solution.