javaaxonmicrometeraxon-framework

Tagging metrics from an Axon Framework MessageMonitor with the segment ID


For some background, we have a custom message monitor that is very similar to the EventProcessorLatencyMonitor included with Axon Framework >=4.1.

My understanding is that onMessageIngested would only ever be called if that processor has claimed a segment of the processing group. I'd like to include this segment ID as a tag on the metric, similar to how the processor name is tagged. With the way the MessageMonitor is instantiated, I'm not sure if this is possible.

Our use case is that sometimes only one segment of the processing group will have a problem (becoming blocked, performance issue, etc), and this will help us identify it a lot more easily.

On a practical note, I'd assume this tag can't be set at instantiation time, but would need to be set (and unset) at the time that a segment is claimed, since it is possible for different segments to be claimed by the processor over its lifetime. And in fact maybe that isn't even practical, as I believe the processor threadcount could mean that multiple segments are claimed at once by the same processor.

We're using Axon Framework 4.5.2 currently (although hoping to update to 4.6.x soon). This is with the non-Axon Server, JPA based tracking event processors.

I'm aware of GlobalMetricRegistry, and we have a class that extends it, overriding registerComponentWithDefaultTags and registerEventProcessor to use slightly different tags than usual. In here, we only have access to the componentName and componentType, which doesn't provide access to the segmentId.

The only way I can think of to make this possible is if the Function<Message<?>, Iterable<Tag>> tagsBuilder gets enhanced somehow to include more than just the Message, but also some information about the processor/token (ie, the segment). But this would require API changes from Axon themselves, maybe somebody else has a more feasible solution or something that can be done with existing functionality?


Solution

  • I think I've got a straightforward solution for you, @kagof.

    Axon Framework's TrackingEventProcessor and PooledStreamingEventProcessor add the TrackingToken and segment identifier to the resources of the UnitOfWork. They do so under the key "Processor[" + builder.name + "]/SegmentId". The name field is the name you've given to the Event Processor. Most likely, you use the @ProcessingGroup annotation to provide this name.

    As the framework sets the current UnitOfWork as a thread local, you can retrieve it whenever one is active.

    Knowing this, you can do something like:

    if (CurrentUnitOfWork.isStarted()) {
        String segmentIdString = (String) CurrentUnitOfWork.get()
                                                           .resources()
                                                           .entrySet()
                                                           .stream()
                                                           .filter(entry -> entry.getKey().contains("/SegmentId"))
                                                           .map(Map.Entry::getValue)
                                                           .findFirst()
                                                           .orElse("No SegmentId found");
    }
    

    I'm doing a filter above, as I assume you want a generic solution for all your Event Processors.

    On any note, I hope this helps you out!