I'm trying to configure a MassTransit StateMachine saga that should be triggered when a message is sent, but sending the message does not initiate the saga; the message remains in the queue. If I publish the message, it works, but I think that for this use-case, sending the message is the right approach.
What am I missing here?
I have the strong feeling that the Consumer Endpoint for the InitiateAccountDeletion queue is not being configured, but I would expect the ConfigureEndpoints() method to do so.
public class DeleteAccountSaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string? CurrentState { get; set; }
public int FilesState { get; set; }
}
public class DeleteAccountStateMachine : MassTransitStateMachine<DeleteAccountSaga>
{
// Files are being deleted.
public State DeletingFiles { get; }
// Files were deleted.
public State FilesDeletionCompleted { get; }
// The account and related data is being deleted from the database.
public State DeletingAccount { get; }
// The account and related data have been deleted from the database.
public State AccountDeletionCompleted { get; }
// This message should initiate the saga.
public Event<InitiateAccountDeletion> InitiateAccountDeletion { get; }
// This event indicates that files from a specific server (T1) were deleted.
public Event<AccountFilesT1Deleted> AccountFilesT1Deleted { get; }
// This event indicates that files from a specific server (T2) were deleted.
public Event<AccountFilesT2Deleted> AccountFilesT2Deleted { get; }
// Composite event that indicates that files from all servers were deleted.
public Event AccountFilesDeleted { get; }
// This event indicates that the database records were deleted.
public Event<AccountDeleted> AccountDeleted { get; }
public DeleteAccountStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => InitiateAccountDeletion, x =>
{
x.InsertOnInitial = true;
x.CorrelateById(context => context.Message.AccountId);
x.SetSagaFactory(context => new()
{
CorrelationId = context.Message.AccountId
});
});
Event(() => AccountFilesT1Deleted, x => x.CorrelateById(context => context.Message.AccountId));
Event(() => AccountFilesT2Deleted, x => x.CorrelateById(context => context.Message.AccountId));
Event(() => AccountDeleted, x => x.CorrelateById(context => context.Message.AccountId));
// The saga should be started when the InitiateAccountDeletion message is sent.
// After, it should send DeleteFilesT1 and DeleteFilesT2 messages so that files from corresponsing servers are deleted.
Initially(
When(InitiateAccountDeletion)
.Send(context => new DeleteFilesT1(context.Message.AccountId))
.Send(context => new DeleteFilesT2(context.Message.AccountId))
.TransitionTo(DeletingFiles)
);
// Indicate that all files were deleted.
CompositeEvent(() => FilesDeleted, x => x.FilesState,
AccountFilesT1Deleted,
AccountFilesT2Deleted
);
// When all files have been deleted, send DeleteAccount message which will delete all data from the database.
During(DeletingFiles,
Ignore(InitiateAccountDeletion),
When(FilesDeleted)
.TransitionTo(FilesDeletionCompleted)
.Send(context => new DeleteAccount(context.Saga.CorrelationId))
);
During(FilesDeletionCompleted,
When(AccountDeleted)
.TransitionTo(AccountDeletionCompleted)
.Finalize()
);
SetCompletedWhenFinalized();
}
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// [...]
EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(InitiateAccountDeletion)}"));
EndpointConvention.Map<DeleteFilesT1>(new Uri($"queue:{nameof(DeleteFilesT1)}"));
EndpointConvention.Map<DeleteFilesT2>(new Uri($"queue:{nameof(DeleteFilesT2)}"));
EndpointConvention.Map<DeleteAccount>(new Uri($"queue:{nameof(DeleteAccount)}"));
services.AddMassTransit(x =>
{
x.AddEntityFrameworkOutbox<AccountDbContext>(options =>
{
options.UsePostgres();
options.UseBusOutbox();
});
// This registers:
// - DeleteFilesT1Consumer
// - DeleteFilesT2Consumer
// - DeleteAccountConsumer
x.AddConsumers(typeof(Startup).Assembly);
x.AddSagaStateMachine<DeleteAccountStateMachine, DeleteAccountSaga>()
.EntityFrameworkRepository(options =>
{
options.ExistingDbContext<AccountDbContext>();
options.UsePostgres();
});
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(configuration["AWS:Region"], (_) => { });
cfg.ConfigureEndpoints(context);
});
});
}
}
public class AccountController : ControllerBase
{
[HttpDelete("{accountId}")]
public async Task<IActionResult> Delete(Guid accountId)
{
var account = await _dbContext.Account.FindAsync(accountId);
if (account == null)
return NotFound("...");
account.Status = "Deleting";
// This message remains in the InitiateAccountDeletion queue. It doesn't get processed by the StateMachine.
await _sendEndpointProvider.Send<InitiateAccountDeletion>(new(
AccountId: account.AccountId
));
await _dbContext.SaveChangesAsync();
return Accepted();
}
}
It works by changing this EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(InitiateAccountDeletion)}"));
to this EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(DeleteTenant)}"));
With the default endpoint name formatter, the saga state machine will be configured on a receive endpoint named DeleteAccountSaga
- which is not where you are sending the message.
I'd suggest increasing the log level and reviewing the logs to see how each endpoint is configured and where messages are being sent. It's clear that the endpoint convention you're configuring for that message type is not the correct destination.
It's also why Publish works since MassTransit knows how to properly route the message to the saga.