Hi I have a file listener that is reading files parallel / more than one at a time
package com.example.demo.flow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.stereotype.Component;
import java.io.File;
import java.util.concurrent.Executors;
/**
* Created by muhdk on 03/01/2020.
*/
@Component
@Slf4j
public class TestFlow {
@Bean
public StandardIntegrationFlow errorChannelHandler() {
return IntegrationFlows.from("testChannel")
.handle(o -> {
log.info("Handling error....{}", o);
}).get();
}
@Bean
public IntegrationFlow testFile() {
IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(5)
.errorChannel("testChannel")))
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
.transform(o -> {
throw new RuntimeException("Failing on purpose");
}).handle(o -> {
});
return testChannel.get();
}
}
Its not going to my custom error channel
but if I remove the line
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
Then it goes to error channel.
How can I make it work so that it does go to my custom error channel with executor.
It looks like when using Executor services with multiple messages it doesn't work with normal errorChannel which I have no idea why
I made a change like this
@Bean
public IntegrationFlow testFile() {
IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(10)
))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
.transform(o -> {
throw new RuntimeException("Failing on purpose");
}).handle(o -> {
});
return testChannel.get();
}
The line here
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))
The rest remain the same and it works.