springspring-integrationspring-dsl

How to set errorChannel when i user a taskExecutor of publishSubscribeChannel?


I used publishSubscribeChannel and add a taskExecutor to implement async.

Below is the code.

@Bean
public IntegrationFlow mainFlow(){
    return IntegrationFlows.from("mainFlow")
            ..
            .publishSubscribeChannel(subFlowTaskExecutor, subscribe->{
                subscribe.subscribe(flow->
                        flow.channel("testFlow")); })
            ..
            .enrichHeaders(c->c.header(HttpHeaders.STATUS_CODE,HttpStatus.OK))
            .get();
}

@Bean
public IntegrationFlow testFlow (){
    return IntegrationFlows.from("testFlow")
            .handler(handlerSomeThing())
            .get();
}

As you see , mainFlow to testFlow. Now I want to implement an errorChannel to handle testFlow exception. So what's the good way to do ?

I tried to use the way like this , implement an ErrorHandler and set into subscribe.But is there any other way ?

private TestErrorHandler errorHandler;

@Bean
public IntegrationFlow mainFlow(){
    return IntegrationFlows.from("mainFlow")
            ..
            .publishSubscribeChannel(subFlowTaskExecutor, subscribe->{
                subscribe.errorHandler(errorHandler);
                subscribe.subscribe(flow->
                        flow.channel("testFlow")); })
            ..
            .enrichHeaders(c->c.header(HttpHeaders.STATUS_CODE,HttpStatus.OK))
            .get();
}

@Component

public class TestErrorHandler implements ErrorHandler {

@Autowired
private MessagingTemplate messagingTemplate;

@Autowired
@Qualifier(RTSChannel.PerformNameScreening.ERROR_CHANNEL)
private MessageChannel errorChannel;

@Override
public void handleError(Throwable throwable) {
    messagingTemplate.send(errorChannel,new ErrorMessage(throwable));
}

@Bean
public MessagingTemplate errorMessagingTemplate(){
    return new MessagingTemplate();
}

}


Solution

  • @Bean
    public IntegrationFlow mainFlow(){
        return IntegrationFlows.from("mainFlow")
    

    You can't have a flow with the same name as the first channel in the flow. Assuming you meant

    @Bean
    public IntegrationFlow mainFlow(){
        return IntegrationFlows.from("mainFlowChannel")
    

    Normally, the error channel would be on some component upstream of mainFlowChannel.

    If you want to scope the error handling just within the subflow, you would need to use a .gateway() there.

                .publishSubscribeChannel(subFlowTaskExecutor, subscribe-> flow->
                            flow.gateway(testFlow(), g -> g.errorChannel("errorFlowInputChannel)))