spring-integrationspring-micrometerspring-integration-sftpmicrometer-tracing

Spring integration : trace propagation via micrometer on different channels?


I am facing problems with setting up the trace propagation correctly, if there is a QueueChannel with a poller in between :(

Flow is initiated by HTTP call, that triggers MessagingGateway from spring integration:

@RestController
@RequiredArgsConstructor
class FooController {

    private final InitiateSftpImportGateway gateway;

    @ResponseStatus(HttpStatus.NO_CONTENT)
    @PostMapping("/bar")
    void runImportAt(@RequestParam String path) {
        gateway.runImportAt(path);
    }

    @MessagingGateway(defaultRequestChannel = "sftpListingChannel", errorChannel = "errorChannel")
    interface InitiateSftpImportGateway {
        @Gateway
        void runImportAt(String path);
    }
}

channels are defined as follows:

    @Bean
    MessageChannel sftpListingChannel(final ObservationRegistry observationRegistry) {
        final var directChannel = new DirectChannel();
        directChannel.registerObservationRegistry(observationRegistry);
        return directChannel;
    }

    @Bean
    MessageChannel sftpFetchingChannel(final ObservationRegistry observationRegistry) {
        final var queueChannel = new QueueChannel();
        queueChannel.registerObservationRegistry(observationRegistry);
        return queueChannel;
    }

and a snippet from actual flow - based on trigger, sftpListingChannel pulls files on SFTP:

    @Bean
    @ServiceActivator(inputChannel = "sftpListingChannel", outputChannel = "fileSplittingChannel")
    MessageHandler listFiles(final CachingSessionFactory<SftpClient.DirEntry> cachingSessionFactory) {
        final var outboundGateway = new SftpOutboundGateway(
            cachingSessionFactory,
            AbstractRemoteFileOutboundGateway.Command.LS.getCommand(),
            "payload"
        );
        outboundGateway.setOption(AbstractRemoteFileOutboundGateway.Option.RECURSIVE);
        return outboundGateway;
    }

and splits them:

    @Splitter(inputChannel = "fileSplittingChannel", outputChannel = "sftpFetchingChannel")
    List<FileInfo<SftpClient.DirEntry>> splitByFile(@Payload final List<FileInfo<SftpClient.DirEntry>> fileInfo) {
        return fileInfo;
    }

for each file - we download it:

    @Bean
    @ServiceActivator(
        inputChannel = "sftpFetchingChannel",
        outputChannel = "...",
        poller = @Poller(value = "transactionalPoller")
    )
    MessageHandler fetchFiles(CachingSessionFactory<SftpClient.DirEntry> cachingSessionFactory) {
        final var outboundGateway = new SftpOutboundGateway(
            cachingSessionFactory,
            AbstractRemoteFileOutboundGateway.Command.GET.getCommand(),
            Constants.Expressions.FILE_LOCATION_FROM_HEADER
        );
        outboundGateway.setOption(
            AbstractRemoteFileOutboundGateway.Option.STREAM,
            AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP
        );
        return outboundGateway;
    }

which uses a custom-poller, to inject transaction advice:

    @Bean
    PollerMetadata transactionalPoller(final TransactionManager transactionManager) {
        final var pollerMetadata = new PollerMetadata();

        pollerMetadata.setAdviceChain(
            List.of(new TransactionInterceptorBuilder().transactionManager(transactionManager).build())
        );

        return pollerMetadata;
    }

The poller (?) terminates initial trace and initialises a new one - this is troublesome as I would like to track the overall flow and just relay on span IDs to deep-dive into particular file-sub-process.

My biggest concern is the errorChannel - if some sub-operation after split throws an error, it is handled on errorChannel but with a new trace ID :(

Kindly advise if/how should I adjust my config to make trace propagation smoother.


Disclaimer:


For the record:


update @ 02JAN (based on comment from @artem)

Adjusted config for queue channel registration:

    @Bean
    MessageChannel sftpFetchingChannel() {
        return new QueueChannel();
    }

(btw - do I still need to call registerObservationRegistry here?)

and added

    @Bean
    @GlobalChannelInterceptor
    ObservationPropagationChannelInterceptor observationPropagationChannelInterceptor(
        final ObservationRegistry observationRegistry
    ) {
        return new ObservationPropagationChannelInterceptor(observationRegistry);
    }

this solved the trace propagation "end-to-end", ...without error channel :( there, I still get new trace being initiated or none at all (depends on further config); I tried my own registration:

    @Bean
    MessageChannel errorChannel(final ObservationPropagationChannelInterceptor observationPropagationChannelInterceptor) {
        final var directChannel = new DirectChannel();
        directChannel.addInterceptor(observationPropagationChannelInterceptor);  //ignored whatsoever and no trace
        //directChannel.registerObservationRegistry(observationRegistry); creates new trace
        return directChannel;
    }

do I miss something in regards to errorChannel definition?

btw, consumer of error channel is:

    @Bean
    @ServiceActivator(inputChannel = "errorChannel")
    MessageHandler handleIntegrationErrors() {
        return message -> {
            // do some logging
            }
        };
    }

Solution

  • You really are missing an Observation Propagation. The QueueChannel is consumed on the other thread and there is no knowledge about the trace in the producer thread. So, you need to configure the mentioned ObservationPropagationChannelInterceptor to propagate the trace from producer to consumer via message stored in that channel.