I'm using spring-integration
Most of my flows works on DirectChannels which assure synchronous processing of events however i've noticed that in case of any errors in these flows, the error flow is triggered on a separate thread.
My logging logic relies heavily on MDC properties and as such i'm not able to trace the errors back to the source flow.
@Configuration
public class GenericErrorFlow {
public static final String CHANNEL = "errorChannel";
@Bean
QueueChannel errorChannel() {
return new QueueChannel(500);
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(500)
.errorChannel(CHANNEL);
}
}
@Configuration
public class ExampleFlow{
public static final String CHANNEL = "exampleFlowChannel";
@Bean(name = CHANNEL)
public MessageChannel getChannel() {
DirectChannel dc = new DirectChannel();
dc.setComponentName(CHANNEL);
return dc;
}
@Bean
public IntegrationFlow exampleFlow() {
return IntegrationFlows.from(CHANNEL)
.<ConsumerRecord>handle((message, h) -> {
//perform some logic here
}).get();
}
}
@MessagingGateway (errorChannel = GenericErrorFlow.CHANNEL)
public interface MessageGateway {
// other routings here
}
Below are the few possible approaches i could think of (in order of preference):
Please suggest which of them should i go with (some entirely new approach is also fine). Would also appreciate some pointers on how to implement approach 1 and 2.
Your errorChannel
is decalred as a QueueChannel
. Therefore it is pollable and processed by the scheduled tasks. Those threads in the TaskScheduler
know nothing about the producing thread. Therefore you cannot implement your idea #1.
The second idea is the simplest and there is just enough to make your errorChannel
as a DirectChannel
.
The third one is fully OK and that's exactly what we do in the ThreadStatePropagationChannelInterceptor
. So, you just need to implement this interceptor and point to what thread state (and how) you would like to propagate from the producer to consumer over that QueueChannel
.
NOTE: it is not OK to name your error channel as a errorChannel
which is reserved for global default channel for errors:
https://docs.spring.io/spring-integration/reference/channel/special-channels.html
https://docs.spring.io/spring-integration/reference/error-handling.html