In my application, user can trigger updates for a specific resource. When that happens, an UpdateInitiated
event is sent.
A MassTransit Saga is initiated based on that event, and it triggers a series of other events until it considers itself done and finalizes.
While an update is ongoing, users can make changes to the resource and request a new update. I'm trying to figure out how I can abort the ongoing Saga and create a new instance, when a new UpdateInitiated
event is sent for the same resource.
Two approaches I'm thinking about
Correlate on ResourceId. So the initial Saga would get the second UpdateInitiated
event as well. But then I don't know how to reset the saga (and stop listening for events related to the previous update).
Use CorrelatedBy<Guid>
(as I am doing now). This would create a new Saga for the new update event. But then I don't know how to cancel the initial Saga.
Is there a way to listen to arbitrary events in a Saga? Like: "Listen to all UpdatedInitiated
events even if they don't correlate, and cancel the current Saga if there is an event with the matching ResourceId".
I realized that it's possible to register for events where you correlate on something other than the CorrelationId
. In my use case, I want to abort an ongoing saga if there is a new event kicking off a new saga, for the same ResourceId
.
What I did was this:
// Listen to update initiated events for the same ResourceId
Event(() => UpdateInitiated , e => e
.CorrelateBy((instance, context) =>
instance.CurrentState != 2 &&
instance.CorrelationId != context.Message.CorrelationId &&
instance.ResourceId == context.Message.ResourceId
)
.SelectId(x => NewId.NextGuid()));
// Finalize this saga if there is a newer one for the same ResourceId
DuringAny(
When(UpdateInitiated)
.Then(context => Log.Information("Saga aborted due to newer update initiated by {correlationId}", context.Data.CorrelationId))
.Finalize());