entity-framework-corestate-machinemasstransitsagaautomatonymous

Multiple State machines with automatonymous using EFCore persistence not working


We are using Masstransit with automatonymous and InMemoryRepository for saga persistence. We have around 3 state machines configured and working perfectly. We recently changed from InMemoryRepository to EFCore for persistence. This resulted in only the 1st configured state machine to work perfectly. Rest all statemachines are not even entering the Initially event. Need help to understand if the implementation is correct or not. Below are code details:

masstransit statemachine setup

 services.AddMassTransit(x =>
        {
            x.AddBus(provider => MassTransit.Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.Host(hostUri, hst =>
                {
                    hst.Username(appSettings.RabbitMQ.Username);
                    hst.Password(appSettings.RabbitMQ.Password);
                });

                cfg.ReceiveEndpoint("microservice-response", e =>
                {
                    AddConsumers(e, provider);
                    e.ConfigureSaga<ServiceRequest1RegisterState>(provider);
                    e.ConfigureSaga<ServiceRequest1UpdateState>(provider);
                    e.ConfigureSaga<ServiceRequest1ApproveState>(provider);
                });
            }));

            ////x.AddSagaStateMachine<ServiceRequest1RegisterStateMachine, ServiceRequest1RegisterState>()
            ////   .InMemoryRepository();
            ////x.AddSagaStateMachine<ServiceRequest1UpdateStateMachine, ServiceRequest1UpdateState>()
            ////   .InMemoryRepository();
            ////x.AddSagaStateMachine<ServiceRequest1ApproveStateMachine, ServiceRequest1ApproveState>()
            ////   .InMemoryRepository();

            x.AddSagaStateMachine<ServiceRequest1RegisterStateMachine, ServiceRequest1RegisterState>()
                .EntityFrameworkRepository(r =>
                {
                    r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion

                    r.AddDbContext<DbContext, ServiceRequest1RegisterStateDbContext>((provider, builder) =>
                    {
                        builder.UseSqlServer(configuration.GetConnectionString("StateDBConnection"), m =>
                        {
                            m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                            m.MigrationsHistoryTable($"__{nameof(ServiceRequest1RegisterStateDbContext)}");
                        });
                    });
                });

    x.AddSagaStateMachine<ServiceRequest1UpdateStateMachine, ServiceRequest1UpdateState>()
                .EntityFrameworkRepository(r =>
                {
                    r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion

                    r.AddDbContext<DbContext, ServiceRequest1UpdateStateDbContext>((provider, builder) =>
                    {
                        builder.UseSqlServer(configuration.GetConnectionString("StateDBConnection"), m =>
                        {
                            m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                            m.MigrationsHistoryTable($"__{nameof(ServiceRequest1UpdateStateDbContext)}");
                        });
                    });
                });

            x.AddSagaStateMachine<ServiceRequest1ApproveStateMachine, ServiceRequest1ApproveState>()
                .EntityFrameworkRepository(r =>
                {
                    r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion

                    r.AddDbContext<DbContext, ServiceRequest1ApproveStateDbContext>((provider, builder) =>
                    {
                        builder.UseSqlServer(configuration.GetConnectionString("StateDBConnection"), m =>
                        {
                            m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                            m.MigrationsHistoryTable($"__{nameof(ServiceRequest1ApproveStateDbContext)}");
                        });
                    });
                });
        });

        services.AddSingleton<IHostedService, MassTransitBusService>();

DBContext looks like below for all 3 statemachines

 public class ServiceRequest1ApproveStateDbContext : SagaDbContext
{
    public ServiceRequest1ApproveStateDbContext(DbContextOptions<ServiceRequest1ApproveStateDbContext> options)
        : base(options)
    {
    }

    /// <summary>
    /// Gets the configurations.
    /// </summary>
    protected override IEnumerable<ISagaClassMap> Configurations
    {
        get { yield return new ServiceRequest1ApproveStateMap(); }
    }
}

State map below

public class ServiceRequest1ApproveStateMap : SagaClassMap<ServiceRequest1ApproveState>
{
    protected override void Configure(EntityTypeBuilder<ServiceRequest1ApproveState> entity, ModelBuilder model)
    {
        entity.Property(x => x.CurrentState).HasMaxLength(64);
        entity.Property(x => x.Id);
        entity.Property(x => x.ServiceId);
        entity.Property(x => x.ReadyEventStatus);
    }
}

From the above code, only the ServiceRequest1RegisterStateMachine works perfectly, the rest statemachines dont even enter Initially. I can confirm that the migrations are all done beforehand and these statemachine are all working fine individually when only 1 repository configuration exists, ie, if only repo config for ServiceRequest1ApproveStateMachine, this state machine works fine. But if all 3 repo config exists, then only the 1st works. I need guidance on correctly implementing saga persistence using EFCore. Should I try implementing using single dbcontext as mentioned in masstransit docs(https://masstransit-project.com/usage/sagas/efcore.html)


Solution

  • So below are the changes i did to ensure the statemachine works as expected. The changes were done based on @Chris suggestion in comments, even though our initial implementation of the statemachine was based on the docs on sqlserver efcore persistence.

    services.AddDbContext<ServiceRequest1RegisterStateDbContext>(x =>
                         x.UseSqlServer(configuration.GetConnectionString("StateDBConnection")));
            services.AddDbContext<ServiceRequest1UpdateStateDbContext>(x =>
                           x.UseSqlServer(configuration.GetConnectionString("StateDBConnection")));
            services.AddDbContext<ServiceRequest1ApproveStateDbContext>(x =>
                            x.UseSqlServer(configuration.GetConnectionString("StateDBConnection")));
    
    services.AddMassTransit(x =>
        {
            x.AddBus(provider => MassTransit.Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.Host(hostUri, hst =>
                {
                    hst.Username(appSettings.RabbitMQ.Username);
                    hst.Password(appSettings.RabbitMQ.Password);
                });
    
                cfg.ReceiveEndpoint("microservice-response", e =>
                {
                    AddConsumers(e, provider);
                    e.ConfigureSaga<ServiceRequest1RegisterState>(provider);
                    e.ConfigureSaga<ServiceRequest1UpdateState>(provider);
                    e.ConfigureSaga<ServiceRequest1ApproveState>(provider);
                });
            }));
    
            ////x.AddSagaStateMachine<ServiceRequest1RegisterStateMachine, ServiceRequest1RegisterState>()
            ////   .InMemoryRepository();
            ////x.AddSagaStateMachine<ServiceRequest1UpdateStateMachine, ServiceRequest1UpdateState>()
            ////   .InMemoryRepository();
            ////x.AddSagaStateMachine<ServiceRequest1ApproveStateMachine, ServiceRequest1ApproveState>()
            ////   .InMemoryRepository();
    
            x.AddSagaStateMachine<ServiceRequest1RegisterStateMachine, ServiceRequest1RegisterState>()
                .EntityFrameworkRepository(r =>
                {
                    r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion
    
                    r.ExistingDbContext<ServiceRequest1RegisterStateDbContext>();
                });
    
    x.AddSagaStateMachine<ServiceRequest1UpdateStateMachine, ServiceRequest1UpdateState>()
                .EntityFrameworkRepository(r =>
                {
                    r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion
    
                    r.ExistingDbContext<ServiceRequest1UpdateStateDbContext>();
                });
    
            x.AddSagaStateMachine<ServiceRequest1ApproveStateMachine, ServiceRequest1ApproveState>()
                .EntityFrameworkRepository(r =>
                {
                    r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion
    
                    r.ExistingDbContext<ServiceRequest1ApproveStateDbContext>();
                });
        });
    
        services.AddSingleton<IHostedService, MassTransitBusService>();
    

    }