javaspring-bootspring-integrationspring-integration-dslspring-integration-file

Spring Integration DSL Custom Error Channel Issue with Executor


Hi I have a file listener that is reading files parallel / more than one at a time

package com.example.demo.flow;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.stereotype.Component;

import java.io.File;
import java.util.concurrent.Executors;

/**
 * Created by muhdk on 03/01/2020.
 */
@Component
@Slf4j
public class TestFlow {

    @Bean
    public StandardIntegrationFlow errorChannelHandler() {

        return IntegrationFlows.from("testChannel")
                .handle(o -> {

                    log.info("Handling error....{}", o);
                }).get();
    }

    @Bean
    public IntegrationFlow testFile() {


        IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
                e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(5)
                        .errorChannel("testChannel")))
                .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
                .transform(o -> {

                    throw new RuntimeException("Failing on purpose");

                }).handle(o -> {
                });

        return testChannel.get();


    }


}

Its not going to my custom error channel

but if I remove the line

            .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))

Then it goes to error channel.

How can I make it work so that it does go to my custom error channel with executor.


Solution

  • It looks like when using Executor services with multiple messages it doesn't work with normal errorChannel which I have no idea why

    I made a change like this

    @Bean
    public IntegrationFlow testFile() {
    
    
        IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
                e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(10)
                ))
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))
                .channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
    
                .transform(o -> {
    
                    throw new RuntimeException("Failing on purpose");
    
                }).handle(o -> {
                });
    
        return testChannel.get();
    
    
    }
    

    The line here

            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))
    

    The rest remain the same and it works.