spring-batchspring-integrationspring-integration-dslspring-batch-integrationspring-integration-file

File monitoring and processing using spring batch integration


I am currently learning spring-batch-integration, and for that purpose I wanted to create an application that monitor a txt file in a specified directory, then if the file exists launch a batch job that read the file and put the data in the H2 database. I have done the following:

// IntegrationConfig.class
    @Bean
    public IntegrationFlow integrationFlow(){
        return IntegrationFlows.from(Files.inboundAdapter(new File(inputDirectory))
                                        .autoCreateDirectory(true)
                                        .filter(new SimplePatternFileListFilter("*.txt")), p -> p.poller(pf -> pf.fixedRate(5, TimeUnit.SECONDS)))
                .transform(fileMessageToJobRequest())
                .handle(jobLaunchingGateway())
                .channel("output")
                .get();
    }

    public FileMessageToJobRequest fileMessageToJobRequest(){
        FileMessageToJobRequest messageToJobRequest = new FileMessageToJobRequest();
        messageToJobRequest.setJob(job);
        messageToJobRequest.setFilename("input.file.name");
        return messageToJobRequest;
    }

    @Bean
    public JobLaunchingGateway jobLaunchingGateway(){
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        return new JobLaunchingGateway(simpleJobLauncher);
    }
// FileMessageToJobRequest
@Component
public class FileMessageToJobRequest {
    private Job job;
    private String filename = "input.file.name";

    public void setJob(Job job) {
        this.job = job;
    }

    public void setFilename(String filename) {
        this.filename = filename;
    }

    @Transformer
    public JobLaunchRequest jobLaunchRequest(Message<File> fileMessage){
        return new JobLaunchRequest(job, new JobParametersBuilder()
                .addString(filename, fileMessage.getPayload().getAbsolutePath())
                .addDate("date", new Date())
                .toJobParameters());
    }
}

and the job is:

// BatchConfig.class
 @Bean
    public Step textToH2(FlatFileItemReader<Customer> textReader, JdbcBatchItemWriter<Customer> h2Writer){
        return stepBuilderFactory.get("textToH2")
                .<Customer, Customer>chunk(chunkSize1)
                .reader(textReader)
                .writer(h2Writer)
                .build();
    }

    @Bean
    public Job job(Step textToH2, JobCompletionNotificationListener jobCompletionNotificationListener){
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .listener(jobCompletionNotificationListener)
                .start(textToH2)
                .build();
    }

The app is working but with some problem:
1.After the file is detected, the job is launched, and after the job is completed the following error is showed:

2023-10-17 16:48:01.192 ERROR 21452 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'integrationFlow.message-handler#0' for component 'integrationFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'class path resource [filebatchintegration/integration/IntegrationConfig.class]'; from source: 'bean method integrationFlow']; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=JobLaunchRequest: job, parameters={input.file.name=C:\Desktop\in\source_100.txt.processed, date=1697536080876}, headers={file_originalFile=C:\Desktop\in\source_100.txt, id=a1f03b20-23b1-1e51-93f2-c170ed837a14, file_name=source_100.txt, file_relativePath=source_100.txt, timestamp=1697536080876}]

How to fix the error mentioned? I tried by adding a channel at the end of the integration flow but it cause another error saying dispatcher has no subscriber.

2.I want the app to not launch the job on the processed file twice, in the current program the job is launched each 5 seconds on the same file, and I don't want this to happen. I have tried using additional handler to rename the extension of the processed file and it works. But please take a look at this another app that use integration only to monitor a directory and if a file exist create a copy in another directory:

    @Bean
    MessageChannel greetingRequests(){
        return MessageChannels.direct().get();
    }

    @Bean
    MessageChannel greetingReplies(){
        return MessageChannels.direct().get();
    }

    @Bean
    IntegrationFlow flow(){
        return IntegrationFlows
                .from(greetingRequests())
                .transform(new GenericTransformer<String, String>() {
                    @Override
                    public String transform(String source) {
                        return source.toUpperCase();
                    }
                })
                .channel(greetingReplies())
                .get();
    }

    @Bean
    IntegrationFlow inboundFilesystemFlow(){
        File directory = new File(SystemPropertyUtils.resolvePlaceholders("${HOME}/Desktop/in"));
        return IntegrationFlows
                .from(Files.inboundAdapter(directory).autoCreateDirectory(true), p -> p.poller(pf -> pf.fixedRate(5, TimeUnit.SECONDS)))
                .transform(new FileToStringTransformer())
                .handle(new GenericHandler<String>() {
                    @Override
                    public Object handle(String payload, MessageHeaders headers) {
                        headers.forEach((k,v)-> System.out.println(k+"="+v));
                        return payload;
                    }
                })
                .channel(greetingRequests())
                .get();
    }

    @Bean
    IntegrationFlow outboundFilesystemFlow(){
        File directory = new File(SystemPropertyUtils.resolvePlaceholders("${HOME}/Desktop/out"));
        return IntegrationFlows
                .from(greetingReplies())
                .handle(new GenericHandler<String>() {
                    @Override
                    public Object handle(String payload, MessageHeaders headers) {
                        System.out.println(payload);
                        return payload;
                    }
                })
                .handle(Files.outboundAdapter(directory).autoCreateDirectory(true))
                .get();
    }

It use the same polling method (to monitor the file) but only processed/copy each detected file once. Why it differs?

UPDATE:

  1. adding .preventDuplicates(true) on the Files.inboundAdapter solved the 2nd problem. But why in the second app its not necessary?
  2. add .log() after handle(jobLaunchingGateway) solved the 1st problem. But can someone explain what's goin on?

Hope someone willing to clear the confusion for me.


Solution

  • The .channel("output") is correct solution because JobLaunchingGateway is a reply-producing handler. You need to figure out what to do next with the result of that gateway or you can just use .channel("nullChannel").

    With that .filter(new SimplePatternFileListFilter("*.txt")) you override a .preventDuplicates(true) option. Consider to use patternFilter() instead.