rabbitmqquarkusopen-telemetry

OpenTelemetry span recordException with Quarkus / RabbitMQ


I have been working on a Quarkus proto about messaging and observability, using rabbitMQ and opentelemetry. Everything works fine but now I am trying to get some error handling in the pipe.

So I have been doing this code :

    @Incoming("orientation-done")
    @Blocking
    public CompletionStage<Void> process(Message<byte[]> orientationResponse) throws InvalidProtocolBufferException {
        log.info("Received orientation response: {}", orientationResponse);
        OrientationUpdated message = OrientationUpdated.parseFrom(orientationResponse.getPayload());
        if (!message.getOrientationMessage().hasCer()) {
            log.info("MESSAGE NOT ACKED");
            Exception ex = new OrientationException("Orientation does not have a CER");
            Span.current()
                    .addEvent("Orientation failed!!!")
                    .setStatus(StatusCode.ERROR)
                    .recordException(ex);
            return orientationResponse.nack(ex, orientationResponse.getMetadata());
        }
        log.info("MESSAGE DELIVERED AND ACKNOWLEDGED with uuid {}", message.getOrientationMessage().getUuid());
        return orientationResponse.ack(orientationResponse.getMetadata());
    }

My problem is that the exception is not added to the existing span as I can see in the logs :

2024-10-14 16:54:23,434 INFO [ine.sof.asd.OrientationProcessor] (vert.x-worker-thread-1) Received orientation response: io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage@27748c0a 2024-10-14 16:54:23,435 INFO [ine.sof.asd.OrientationProcessor] (vert.x-worker-thread-1) MESSAGE NOT ACKED 2024-10-14 16:54:23,437 FINE [io.ope.sdk.tra.SdkSpan] (vert.x-worker-thread-1) Calling addEvent() on an ended Span. 2024-10-14 16:54:23,438 FINE [io.ope.sdk.tra.SdkSpan] (vert.x-worker-thread-1) Calling setStatus() on an ended Span. 2024-10-14 16:54:23,441 FINE [io.ope.sdk.tra.SdkSpan] (vert.x-worker-thread-1) Calling addEvent() on an ended Span.

I think the event add and recordException happens asynchronously and that it happens when the whole span has been terminated, so after the error handling treatment has been achieved. Do I assume right ? How can I correctly add a new exception event to my existing span ?


Solution

  • You ate trying to add things to a span that is already closed. Not sure about the code before and if there is something you can change. In any case, you can always create a new span:

        Span span = tracer.spanBuilder("orientation-response").startSpan();
        try(Scope scope = span.makeCurrent()) {
            if (!message.getOrientationMessage().hasCer()) {
                log.info("MESSAGE NOT ACKED");
                Exception ex = new OrientationException("Orientation does not have a CER");
                span.addEvent("Orientation failed!!!")
                        .recordException(ex);
                return orientationResponse.nack(ex, orientationResponse.getMetadata());
            }
            log.info("MESSAGE DELIVERED AND ACKNOWLEDGED with uuid {}", message.getOrientationMessage().getUuid());
            return orientationResponse.ack(orientationResponse.getMetadata());
            LOG.info("Message received");
        }
        catch (Exception e) {
            span.recordException(e);
        } finally {
            span.end();
        }
    

    Some things that you need to consider:

    1. If you record an exception, the status will already be an ERROR.
    2. Always operate the span inside a scope. This way, you now where it will end and it will always end.
    3. The tracer can be injected.