I am using Spring Integration 6.3.3. I have our main IntegrationFlow consuming from a PubSubInboundAdapter defined like this:
@Bean
public MessageChannel channelA() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter pubsubInboundChannelAdapter(
@Qualifier("channelA") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, subscriptionName);
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.AUTO_ACK);
adapter.setPayloadType(String.class);
adapter.setErrorChannelName("errorChannel");
return adapter;
}
@Bean
public IntegrationFlow flowA(Filter1 filter1, Service1 service1) {
return IntegrationFlow.from("channelA")
.enrichHeaders(spec -> spec.header("flowName", "flowA", true))
.filter(filter1, "filterIt1")
.handle(service1, "handleIt1")
.get();
}
In addition to receiving messages from a PubSub, I also want to allow messages being sent via Http endpoint.
So I added another flow leveraging Http.inboundChannelAdapter
like this:
@Bean
public IntegrationFlow flowHttp(IntegrationFlow flowA) {
return IntegrationFlow.from(Http.inboundChannelAdapter("/messageA")
.requestMapping(m -> m.methods(HttpMethod.POST).consumes("application/json"))
.payloadExpression("body")
.requestPayloadType(String.class))
.to(flowA);
}
When I send an Http request to /messageA
, the message sent to Service1.handleIt1()
does not have the header flowName
.
I can fix this by adding a call to .enrichHeaders()
to the flowHttp
. Or if I comment out the pubsubInboundChannelAdapter
, I get the enriched header.
But why doesn't the enrichHeaders()
in flowA
is not executed if I have the pubsubInboundChannelAdapter
bean?
Just tested it and it works as expected:
@SpringJUnitConfig
class So79248936Tests {
@Test
void verifyHeaderEnricherInFlowComposition(@Autowired Consumer<String> gateway,
@Autowired AtomicReference<Message<?>> messageHolder) {
gateway.accept("test");
assertThat(messageHolder.get()).isNotNull()
.extracting(Message::getHeaders)
.asInstanceOf(MAP)
.containsEntry("flowName", "flowA");
}
@Configuration
@EnableIntegration
static class TestConfiguration {
@Bean
AtomicReference<Message<?>> messageHolder() {
return new AtomicReference<>();
}
@Bean
IntegrationFlow flowA(AtomicReference<Message<?>> messageHolder) {
return IntegrationFlow.from("ChannelA")
.enrichHeaders(spec -> spec.header("flowName", "flowA", true))
.handle(messageHolder::set)
.get();
}
@Bean
IntegrationFlow flowGateway(IntegrationFlow flowA) {
return IntegrationFlow.from(Consumer.class)
.to(flowA);
}
}
}
Yes, my test configuration is very rudimentary, but that still confirms that enrichHeaders()
works there anyway.
Feels like something else is going on in your case. So, it might be very helpful if you share with us a simple project where we can reproduce the problem.
Please, also confirm that both your flows are marked as @Bean
. That ChannelA
is resolved to the DirectChannel
at the moment of injection of one flow bean into another definition.
UPDATE
Here is a working example with Http.inboundChannelAdapter
as you have requested:
@SpringBootApplication
public class So79248936Application {
public static void main(String[] args) {
SpringApplication.run(So79248936Application.class, args);
}
@Bean
ApplicationRunner applicationRunner(RestTemplateBuilder restTemplateBuilder) {
return args -> restTemplateBuilder.build().postForObject("http://localhost:8080/messageA", "test", String.class);
}
@Bean
IntegrationFlow flowA() {
return IntegrationFlow.from("ChannelA")
.enrichHeaders(spec -> spec.header("flowName", "flowA", true))
.handle(m -> System.out.println(m.getHeaders()))
.get();
}
@Bean
IntegrationFlow flowGateway(IntegrationFlow flowA) {
return IntegrationFlow.from(Http.inboundChannelAdapter("/messageA")
.requestMapping(m -> m.methods(HttpMethod.POST))
.payloadExpression("body")
.requestPayloadType(String.class))
.to(flowA);
}
}
And SOUT result is like this:
{content-length=4,
http_requestMethod=POST,
upgrade=h2c,
host=localhost:8080,
http_requestUrl=http://localhost:8080/messageA,
connection=Upgrade, HTTP2-Settings,
id=cc6ca6c9-4ca6-7072-fffa-a0704bcb8c91,
contentType=text/plain;charset=UTF-8,
flowName=flowA,
user-agent=Java-http-client/17.0.12,
accept=[text/plain, application/json, application/*+json, */*],
timestamp=1733328785891}
The flowName=flowA
entry is there.
So, something is fishy with your own project.
UPDATE
Thank you for the sample! I was able to reproduce.
So, this is indeed a bug in Spring Integration Java DSL parser.
You do like this:
IntegrationFlow.from("petStoreSubscriptionMessageChannel")
And that means resolve a bean for that name or create a new one for DirectChannel
. If bean is not there, then this DirectChannel
is provided for the targetIntegrationComponents
to be referenced at runtime. But if bean is there, then we are missing to populated it back to IntegrationFlow
runtime. Therefore the mentioned .to(petStoreSubscriptionFlow)
ends up with a channel reference from the middle of the flow, which is already after the enrichHeaders()
in the subject.
I'll fix that shortly in the framework. And here is a workaround for you:
@Bean
IntegrationFlow petStoreSubscriptionFlow(
@Qualifier("petStoreSubscriptionMessageChannel") MessageChannel inputChannel) {
return IntegrationFlow.from(inputChannel)
Just inject that existing channel bean into this flow definition and all works as expected.