spring-integrationspring-integration-dsl

Spring Integration DSL: After .routeToRecipients() can I route the outputs of the subflows back to the main flow?


I have the following code where I branch into recipientFlows in the mainFlow and then process the result of those recipientFlows in a separate persistToDbFlow, via a direct channel PERSIST_TO_DB_CHANNEL:

@Bean
IntegrationFlow mainFlow() {
    return IntegrationFlows
            .from(Http.inboundChannelAdapter("some/path")
                    .requestMapping(mapping -> mapping.methods(HttpMethod.GET))
                    .get())
            .routeToRecipients(route -> route
                    .recipientFlow("some SpEL Expression", IntegrationFlows
                            .from(MessageChannels.executor(executorService))
                            .gateway(getSomeDataFromSourceA_Flow())
                            .channel(PERSIST_TO_DB_CHANNEL)
                            .get())
                    .recipientFlow("some other SpEL Expression", IntegrationFlows
                            .from(MessageChannels.executor(executorService))
                            .gateway(getSomeDataFromSourceB_Flow())
                            .channel(PERSIST_TO_DB_CHANNEL)
                            .get())
                    .defaultSubFlowMapping(flow -> flow
                            .log(WARN)
                            .nullChannel()))
            .get();
}

@Bean
IntegrationFlow persistToDbFlow() {
    return IntegrationFlows.from(PERSIST_TO_DB_CHANNEL)
            .gateway(/* persist to DB */)
            .nullChannel();
}

Is there any way I can do the same thing without the need to create a separate persistToDbFlow for the persistence logic, and instead keep the persistence logic as part of the mainFlow, similar to how scatter-gather lets you gather the results back to the main flow ?

I imagine it looking something like this:

@Bean
IntegrationFlow mainFlow() {
    return IntegrationFlows
            .from(Http.inboundChannelAdapter("some/path")
                    .requestMapping(mapping -> mapping.methods(HttpMethod.GET))
                    .get())
            .routeToRecipients(route -> route
                    .recipientFlow("some SpEL Expression", IntegrationFlows
                            .from(MessageChannels.executor(executorService))
                            .gateway(getSomeDataFromSourceA_Flow())
                            .returnToParentChannel())
                    .recipientFlow("some other SpEL Expression", IntegrationFlows
                            .from(MessageChannels.executor(executorService))
                            .gateway(getSomeDataFromSourceB_Flow())
                            .returnToParentChannel())
                    .defaultSubFlowMapping(flow -> flow
                            .log(WARN)
                            .nullChannel()))
            .gateway(/* persist to DB */)
            .get();
}

Solution

  • Almost 3 years later, and I have gotten a step closer to a clean solution:

        @Bean
        IntegrationFlow mainFlow() {
            return IntegrationFlow
                    .from(Http.inboundChannelAdapter("/some/path")
                            .requestMapping(mapping -> mapping.methods(GET)))
                    .gateway(flow -> flow
                                    .routeToRecipients(
                                            route -> route
                                                    .recipientFlow("true", f -> f
                                                            .log(INFO, "category", m -> "do something here.")
                                                            .channel("customReplyChannel"))
                                                    .recipientFlow("false", f -> f
                                                            .log(INFO, "category", m -> "do something else here.")
                                                            .channel("customReplyChannel"))
                                                    .defaultSubFlowMapping(f -> f
                                                            .log(WARN, "category", m -> "(hypothetical) default case")
                                                            .channel("customReplyChannel"))),
                            spec -> spec.replyChannel("customReplyChannel"))
                    .nullChannel();
        }