I am facing problems with setting up the trace propagation correctly, if there is a QueueChannel
with a poller in between :(
Flow is initiated by HTTP call, that triggers MessagingGateway
from spring integration:
@RestController
@RequiredArgsConstructor
class FooController {
private final InitiateSftpImportGateway gateway;
@ResponseStatus(HttpStatus.NO_CONTENT)
@PostMapping("/bar")
void runImportAt(@RequestParam String path) {
gateway.runImportAt(path);
}
@MessagingGateway(defaultRequestChannel = "sftpListingChannel", errorChannel = "errorChannel")
interface InitiateSftpImportGateway {
@Gateway
void runImportAt(String path);
}
}
channels are defined as follows:
@Bean
MessageChannel sftpListingChannel(final ObservationRegistry observationRegistry) {
final var directChannel = new DirectChannel();
directChannel.registerObservationRegistry(observationRegistry);
return directChannel;
}
@Bean
MessageChannel sftpFetchingChannel(final ObservationRegistry observationRegistry) {
final var queueChannel = new QueueChannel();
queueChannel.registerObservationRegistry(observationRegistry);
return queueChannel;
}
and a snippet from actual flow - based on trigger, sftpListingChannel
pulls files on SFTP:
@Bean
@ServiceActivator(inputChannel = "sftpListingChannel", outputChannel = "fileSplittingChannel")
MessageHandler listFiles(final CachingSessionFactory<SftpClient.DirEntry> cachingSessionFactory) {
final var outboundGateway = new SftpOutboundGateway(
cachingSessionFactory,
AbstractRemoteFileOutboundGateway.Command.LS.getCommand(),
"payload"
);
outboundGateway.setOption(AbstractRemoteFileOutboundGateway.Option.RECURSIVE);
return outboundGateway;
}
and splits them:
@Splitter(inputChannel = "fileSplittingChannel", outputChannel = "sftpFetchingChannel")
List<FileInfo<SftpClient.DirEntry>> splitByFile(@Payload final List<FileInfo<SftpClient.DirEntry>> fileInfo) {
return fileInfo;
}
for each file - we download it:
@Bean
@ServiceActivator(
inputChannel = "sftpFetchingChannel",
outputChannel = "...",
poller = @Poller(value = "transactionalPoller")
)
MessageHandler fetchFiles(CachingSessionFactory<SftpClient.DirEntry> cachingSessionFactory) {
final var outboundGateway = new SftpOutboundGateway(
cachingSessionFactory,
AbstractRemoteFileOutboundGateway.Command.GET.getCommand(),
Constants.Expressions.FILE_LOCATION_FROM_HEADER
);
outboundGateway.setOption(
AbstractRemoteFileOutboundGateway.Option.STREAM,
AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP
);
return outboundGateway;
}
which uses a custom-poller, to inject transaction advice:
@Bean
PollerMetadata transactionalPoller(final TransactionManager transactionManager) {
final var pollerMetadata = new PollerMetadata();
pollerMetadata.setAdviceChain(
List.of(new TransactionInterceptorBuilder().transactionManager(transactionManager).build())
);
return pollerMetadata;
}
The poller (?) terminates initial trace and initialises a new one - this is troublesome as I would like to track the overall flow and just relay on span IDs to deep-dive into particular file-sub-process.
My biggest concern is the errorChannel
- if some sub-operation after split throws an error, it is handled on errorChannel
but with a new trace ID :(
Kindly advise if/how should I adjust my config to make trace propagation smoother.
Disclaimer:
get
there is ls
first (or even an adapter) - code above is just a small sub-set of bigger flowFor the record:
Adjusted config for queue channel registration:
@Bean
MessageChannel sftpFetchingChannel() {
return new QueueChannel();
}
(btw - do I still need to call registerObservationRegistry
here?)
and added
@Bean
@GlobalChannelInterceptor
ObservationPropagationChannelInterceptor observationPropagationChannelInterceptor(
final ObservationRegistry observationRegistry
) {
return new ObservationPropagationChannelInterceptor(observationRegistry);
}
this solved the trace propagation "end-to-end", ...without error channel :( there, I still get new trace being initiated or none at all (depends on further config); I tried my own registration:
@Bean
MessageChannel errorChannel(final ObservationPropagationChannelInterceptor observationPropagationChannelInterceptor) {
final var directChannel = new DirectChannel();
directChannel.addInterceptor(observationPropagationChannelInterceptor); //ignored whatsoever and no trace
//directChannel.registerObservationRegistry(observationRegistry); creates new trace
return directChannel;
}
do I miss something in regards to errorChannel
definition?
btw, consumer of error channel is:
@Bean
@ServiceActivator(inputChannel = "errorChannel")
MessageHandler handleIntegrationErrors() {
return message -> {
// do some logging
}
};
}
You really are missing an Observation Propagation. The QueueChannel
is consumed on the other thread and there is no knowledge about the trace in the producer thread. So, you need to configure the mentioned ObservationPropagationChannelInterceptor
to propagate the trace from producer to consumer via message stored in that channel.