I want my Axon replay events, not all but partially.
A full replay is up and running but when i want a partially replay i need a TrackingToken startPosition
for the method resetTokens()
, my problem is how to get this token for the partial replay?
I tried with GapAwareTracingToken
but this does not work.
public void resetTokensWithRestartIndexFor(String trackingEventProcessorName, Long restartIndex) {
eventProcessingConfiguration
.eventProcessorByProcessingGroup(trackingEventProcessorName, TrackingEventProcessor.class)
.filter(trackingEventProcessor -> !trackingEventProcessor.isReplaying())
.ifPresent(trackingEventProcessor -> {
// shutdown this streaming processor
trackingEventProcessor.shutDown();
// reset the tokens to prepare the processor with start index for replay
trackingEventProcessor.resetTokens(GapAwareTrackingToken.newInstance(restartIndex - 1, Collections.emptySortedSet()));
// start the processor to initiate the replay
trackingEventProcessor.start();
});
}
When i use the GapAwareTrackingToken
then i get the exception:
[] - Resolved [java.lang.IllegalArgumentException: Incompatible token type provided.]
I see that there is also a GlobalSequenceTrackingToken
i can use, but i don't see any documentatieon about when these can/should be used.
The main "challenge" when doing a partial reset, is that you need to be able to tell where to reset to. In Axon, the position in a stream is defined with a TrackingToken.
The source that you read from will provide you with such a token with each event that it provides. However, when you're doing a reset, you probably didn't store the relevant token while you were consuming those events.
You can also create tokens using any StreamableMessageSource
. Generally, this is your Event Store, but if you read from other sources, it could be something else, too.
The StreamableMessageSource
provides 4 methods to create a token:
createHeadToken
- the position at the most recent edge of the stream, where only new events will be readcreateTailToken
- the position at the very beginning of the stream, allowing you to replay all events.createTokenAt(Instant)
- the most recent position in the stream that will return all events created on or after the given Instant
. Note that some events may still have a timestamp earlier than this timestamp, as event creation and event storage isn't guaranteed to be the same.createTokenSince(Duration)
- similar to createTokenAt
, but accepting an amount of time to go back.So in your case, createTokenAt
should do the trick.