spring-integrationspring-integration-dsl

spring integration DSL Flow to another Flow does not execute enrichedHeaders


I am using Spring Integration 6.3.3. I have our main IntegrationFlow consuming from a PubSubInboundAdapter defined like this:

@Bean
public MessageChannel channelA() {
    return new DirectChannel();
}

@Bean
public PubSubInboundChannelAdapter pubsubInboundChannelAdapter(
        @Qualifier("channelA") MessageChannel inputChannel,
        PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter = 
        new PubSubInboundChannelAdapter(pubSubTemplate, subscriptionName);
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.AUTO_ACK);
    adapter.setPayloadType(String.class);
    adapter.setErrorChannelName("errorChannel");
    return adapter;
}

@Bean
public IntegrationFlow flowA(Filter1 filter1, Service1 service1) {
    return IntegrationFlow.from("channelA")
                .enrichHeaders(spec -> spec.header("flowName", "flowA", true))
                .filter(filter1, "filterIt1")
                .handle(service1, "handleIt1")
                .get();
}

In addition to receiving messages from a PubSub, I also want to allow messages being sent via Http endpoint.

So I added another flow leveraging Http.inboundChannelAdapter like this:

@Bean
public IntegrationFlow flowHttp(IntegrationFlow flowA) {
    return IntegrationFlow.from(Http.inboundChannelAdapter("/messageA")
                        .requestMapping(m -> m.methods(HttpMethod.POST).consumes("application/json"))
                        .payloadExpression("body")
                        .requestPayloadType(String.class))
                .to(flowA);
}

When I send an Http request to /messageA, the message sent to Service1.handleIt1() does not have the header flowName.

I can fix this by adding a call to .enrichHeaders() to the flowHttp. Or if I comment out the pubsubInboundChannelAdapter, I get the enriched header.

But why doesn't the enrichHeaders() in flowA is not executed if I have the pubsubInboundChannelAdapter bean?


Solution

  • Just tested it and it works as expected:

    @SpringJUnitConfig
    class So79248936Tests {
    
        @Test
        void verifyHeaderEnricherInFlowComposition(@Autowired Consumer<String> gateway,
                @Autowired AtomicReference<Message<?>> messageHolder) {
    
            gateway.accept("test");
            assertThat(messageHolder.get()).isNotNull()
                    .extracting(Message::getHeaders)
                    .asInstanceOf(MAP)
                    .containsEntry("flowName", "flowA");
        }
    
        @Configuration
        @EnableIntegration
        static class TestConfiguration {
    
            @Bean
            AtomicReference<Message<?>> messageHolder() {
                return new AtomicReference<>();
            }
    
            @Bean
            IntegrationFlow flowA(AtomicReference<Message<?>> messageHolder) {
                return IntegrationFlow.from("ChannelA")
                        .enrichHeaders(spec -> spec.header("flowName", "flowA", true))
                        .handle(messageHolder::set)
                        .get();
            }
    
            @Bean
            IntegrationFlow flowGateway(IntegrationFlow flowA) {
                return IntegrationFlow.from(Consumer.class)
                        .to(flowA);
            }
    
        }
    
    }
    

    Yes, my test configuration is very rudimentary, but that still confirms that enrichHeaders() works there anyway.

    Feels like something else is going on in your case. So, it might be very helpful if you share with us a simple project where we can reproduce the problem.

    Please, also confirm that both your flows are marked as @Bean. That ChannelA is resolved to the DirectChannel at the moment of injection of one flow bean into another definition.

    UPDATE

    Here is a working example with Http.inboundChannelAdapter as you have requested:

    @SpringBootApplication
    public class So79248936Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So79248936Application.class, args);
        }
    
        @Bean
        ApplicationRunner applicationRunner(RestTemplateBuilder restTemplateBuilder) {
            return args -> restTemplateBuilder.build().postForObject("http://localhost:8080/messageA", "test", String.class);
        }
    
    
        @Bean
        IntegrationFlow flowA() {
            return IntegrationFlow.from("ChannelA")
                    .enrichHeaders(spec -> spec.header("flowName", "flowA", true))
                    .handle(m -> System.out.println(m.getHeaders()))
                    .get();
        }
    
        @Bean
        IntegrationFlow flowGateway(IntegrationFlow flowA) {
            return IntegrationFlow.from(Http.inboundChannelAdapter("/messageA")
                            .requestMapping(m -> m.methods(HttpMethod.POST))
                            .payloadExpression("body")
                            .requestPayloadType(String.class))
                    .to(flowA);
        }
    
    }
    

    And SOUT result is like this:

    {content-length=4, 
    http_requestMethod=POST, 
    upgrade=h2c, 
    host=localhost:8080, 
    http_requestUrl=http://localhost:8080/messageA, 
    connection=Upgrade, HTTP2-Settings, 
    id=cc6ca6c9-4ca6-7072-fffa-a0704bcb8c91, 
    contentType=text/plain;charset=UTF-8, 
    flowName=flowA, 
    user-agent=Java-http-client/17.0.12, 
    accept=[text/plain, application/json, application/*+json, */*], 
    timestamp=1733328785891}
    

    The flowName=flowA entry is there.

    So, something is fishy with your own project.

    UPDATE

    Thank you for the sample! I was able to reproduce.

    So, this is indeed a bug in Spring Integration Java DSL parser.

    You do like this:

    IntegrationFlow.from("petStoreSubscriptionMessageChannel")
    

    And that means resolve a bean for that name or create a new one for DirectChannel. If bean is not there, then this DirectChannel is provided for the targetIntegrationComponents to be referenced at runtime. But if bean is there, then we are missing to populated it back to IntegrationFlow runtime. Therefore the mentioned .to(petStoreSubscriptionFlow) ends up with a channel reference from the middle of the flow, which is already after the enrichHeaders() in the subject.

    I'll fix that shortly in the framework. And here is a workaround for you:

    @Bean
    IntegrationFlow petStoreSubscriptionFlow(
            @Qualifier("petStoreSubscriptionMessageChannel") MessageChannel inputChannel) {
        return IntegrationFlow.from(inputChannel)
    

    Just inject that existing channel bean into this flow definition and all works as expected.