I'm trying to use Spring integration: I receive a message via JMS and send it to Kafka:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(Jms.messageDrivenChannelAdapter(connectionFactory).destination(mqInQueue))
.log(LoggingHandler.Level.DEBUG, message -> "Received JMS message: " + message.getPayload())
.channel(channels -> MessageChannelFactory.create(channels, "request-channel-1"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(kafkaOutTopic));
}
For checks I am send messages to MQ using POST request + JmsTemplate:
@PostMapping("/mq")
public String sentToMq(@RequestBody final String body) {
jmsTemplate.convertAndSend(mqRequestQueue, body, m -> {
final var span = tracer.startScopedSpan("jms-send");
try {
final var context = span.context();
m.setStringProperty("b3", "%s-%s-%s".formatted(context.traceId(), context.spanId(), Boolean.TRUE.equals(context.sampled()) ? "1" : "0"));
} finally {
span.end();
}
return m;
});
return "done";
}
Everything works fine, except tracing. I'm set b3
header manually before sending, but after receiving Spring Integration overrides it. What should I do to keep the incoming traceId?
Also I have next configuration:
@EnableIntegrationManagement(observationPatterns = "*")
@Bean
@GlobalChannelInterceptor(order = Ordered.HIGHEST_PRECEDENCE)
public ChannelInterceptor observationPropagationChannelInterceptor(final ObservationRegistry observationRegistry) {
return new ObservationPropagationChannelInterceptor(observationRegistry);
}
OK. I know where is the problem.
The JmsMessageDrivenEndpoint
does not propagate its registerObservationRegistry(ObservationRegistry)
down to the ChannelPublishingJmsMessageListener
, therefore an IntegrationObservation.HANDLER
does not restore a trace from that b3
header.
An observation on the MessageChannel
consult only with the current context and really populate a fresh b3
header according to that current context.
Consider to use Jms.messageDrivenChannelAdapter(AbstractMessageListenerContainer)
instead and use AbstractMessageListenerContainer.setObservationRegistry()
to enable its JmsObservationDocumentation.JMS_MESSAGE_PROCESS
consumer tracing.
For JmsMessageDrivenEndpoint
, please, raise a GH issue in Spring Integration.