I am trying to set up an open telemetry tracer to propagate to thread, spawned by the ThreadPoolTaskExecutor which is configured as follows
private DgsDataLoaderAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("threadAsync");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setTaskDecorator(new DataLoaderTaskDecorator());
executor.initialize();
executorInstance = executor;
// spring tracer is obtained here to extract the current span later on
tracer = GlobalOpenTelemetry.getTracer("org.springframework.boot");
}
Later on I call the executor as such:
/**
* Only one (or no) Executor will live in the service at all times. It will be retrieved in a thread safe manner
* @return executor to be used in the DataLoader
*/
public static synchronized Executor get() {
if (instance == null) {
instance = new DgsDataLoaderAsyncExecutor();
}
return instance.getExecutorInstance();
}
static class DataLoaderTaskDecorator implements TaskDecorator {
/**
* Currently, tenant, security, and locale contexts are passed
*/
@Override
public Runnable decorate(Runnable runnable) {
// Current Web Thread context to be passed ot async
var currentSpanContext = Context.current();
var currentSpan = Span.current();
Locale locale = LocaleContextHolder.getLocale();
log.debug("Saving information for async thread...");
return () -> {
// this span shows up in the parent trace as expected
Span asyncSpan = tracer.spanBuilder("DGS Executor async op")
.setParent(currentSpanContext.with(currentSpan))
.startSpan();
try {
// Async thread context
LocaleContextHolder.setLocale(locale);
log.debug("Restoring information for async thread...");
// however the spans in the thread fail to obtain the parent trace and are not linked to it
runnable.run();
} catch (Exception e) {
log.error("Error in async task", e);
} finally {
asyncSpan.end();
log.debug("DgsDataLoader has finished async execution");
}
};
}
}
However the span fails to propagate to the thread and a new parent is created for the spans that are created in that thread.
I have been able to get around that by creating a custom context where I pack the the current span
Span asyncSpan = tracer.spanBuilder("DGS Executor async op")
.setParent(currentSpanContext.with(currentSpan))
.startSpan();
try {
SpanThreadContextHolder.setTracingSpan(asyncSpan);
// Async thread context
LocaleContextHolder.setLocale(locale);
log.debug("Restoring information for async thread...");
runnable.run();
} catch (Exception e) {
log.error("Error in async task", e);
} finally {
SpanThreadContextHolder.removeTracingSpan();
but that comes with a side effect where the downstream code has to be aware of it and activate it through
// inside runnable.run()
var span = SpanThreadContextHolder.getOpenTracingSpan()
if (span!= null) {
Context.current().with(span).makeCurrent()
}
With all this story done, my question is: is it possible to automatically propagate the span to the other threads spawned from the master thread, having the span exist in a THREAD_LOCAL mode?
I have eventually found the answer here.
The executor has to be wrapped with otel context:
public ExecutorService wrapExecutor(ExecutorService executor) {
return Context.taskWrapping(executor);
}