Is it possible to update a saga after receiving an event without a transition to a new state?
I tried this, but it failed due to some kind of recursive problem:
During(CalculateStarted,
When(CalculateLogMessage)
.Then(context => context.Saga.Logs.AddRange(context.Message.Logs))
);
My intention is to save the logs of a long-running external process in the saga. The only other option I know of is to use a consumer that stores these events in the database.
Update1: If I use the above code, this situation will occur at some point:
2024-08-09T12:30:32.7074683Z Maximum destructuring depth reached.
2024-08-09T12:30:32.7016261Z Maximum destructuring depth reached.
2024-08-09T12:30:32.7075366Z Maximum destructuring depth reached.
...
But only if I change the saga. If I only read from the saga, then that won't happen.
Furthermore: Normally the log messages (and other events) arrive every few seconds. But when debugging, I noticed that the messages also arrive all at once due to the time spent inside the breakpoints and me restarting the application randomly. I guess I need to switch to pessimistic locking.
Update2: I have a concurrency problem. Switching to pessimistic locking and using a concurrency limit of 1 works. But with higher limits the DB queries are failing. I wonder why. I thought that pessimistic locking would handle such things by itself.
public static void AddSagas(this IBusRegistrationConfigurator configurator)
{
configurator.AddSagaStateMachine<CalculationRunStateMachine, CalculationRunState>()
.Endpoint(cfg =>
{
const int concurrencyLimit = 5;
cfg.PrefetchCount = concurrencyLimit;
cfg.ConcurrentMessageLimit = concurrencyLimit;
})
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Pessimistic;
r.ExistingDbContext<MyDbContext>();
r.UsePostgres();
});
}
@Chris: Btw this behavior with "destructing" doesn't seem to be related to Serilog. When I lock pessimistic, I get the Postgrtes exceptions. If I switch to optimistic, I get this "destructing" isses. Even after changing the code to this:
During(CalculateStarted,
When(CalculateLogMessage)
.Then(context =>
{
context.Saga.LastModifiedUtc = DateTime.UtcNow;
})
);
Sorry for this, can not figure out how to collapse it:
An exception occurred while iterating over the results of a query for context type '"My.Data.Database.MyDbContext"'."\r\n""System.InvalidOperationException: An exception has been raised that
is likely due to a transient failure.\r\n ---> Npgsql.PostgresException (0x80004005): 40001: could not serialize access due to concurrent update\r\n at Npgsql.Internal.NpgsqlConnector.ReadMessageLong(Boolean async, DataRowLoadingMode dataRowLoad
ingMode, Boolean readingNotifications, Boolean isReadingPrependedMessage)\r\n at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder`1.StateMachineBox`1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 toke
n)\r\n at Npgsql.NpgsqlDataReader.NextResult(Boolean async, Boolean isConsuming, CancellationToken cancellationToken)\r\n at Npgsql.NpgsqlDataReader.NextResult(Boolean async, Boolean isConsuming, CancellationToken cancellationToken)\r\n at N
pgsql.NpgsqlCommand.ExecuteReader(Boolean async, CommandBehavior behavior, CancellationToken cancellationToken)\r\n at Npgsql.NpgsqlCommand.ExecuteReader(Boolean async, CommandBehavior behavior, CancellationToken cancellationToken)\r\n at Npgs
ql.NpgsqlCommand.ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)\r\n at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, Cancella
tionToken cancellationToken)\r\n at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, CancellationToken cancellationToken)\r\n at Microsoft.EntityFrameworkCore.Query.In
ternal.SingleQueryingEnumerable`1.AsyncEnumerator.InitializeReaderAsync(AsyncEnumerator enumerator, CancellationToken cancellationToken)\r\n at Npgsql.EntityFrameworkCore.PostgreSQL.Storage.Internal.NpgsqlExecutionStrategy.ExecuteAsync[TState,TR
esult](TState state, Func`4 operation, Func`4 verifySucceeded, CancellationToken cancellationToken)\r\n Exception data:\r\n Severity: ERROR\r\n SqlState: 40001\r\n MessageText: could not serialize access due to concurrent update\r\n F
ile: nodeLockRows.c\r\n Line: 227\r\n Routine: ExecLockRows\r\n --- End of inner exception stack trace ---\r\n at Npgsql.EntityFrameworkCore.PostgreSQL.Storage.Internal.NpgsqlExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func`4 operation, Func`4 verifySucceeded, CancellationToken cancellationToken)\r\n at Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable`1.AsyncEnumerator.MoveNextAsync()" [Microsoft.EntityFrameworkCore.Query]
System.InvalidOperationException: An exception has been raised that is likely due to a transient failure.
---> Npgsql.PostgresException (0x80004005): 40001: could not serialize access due to concurrent update
at Npgsql.Internal.NpgsqlConnector.ReadMessageLong(Boolean async, DataRowLoadingMode dataRowLoadingMode, Boolean readingNotifications, Boolean isReadingPrependedMessage)
at System.Runtime.CompilerServices.PoolingAsyncValueTaskMethodBuilder`1.StateMachineBox`1.System.Threading.Tasks.Sources.IValueTaskSource<TResult>.GetResult(Int16 token)
at Npgsql.NpgsqlDataReader.NextResult(Boolean async, Boolean isConsuming, CancellationToken cancellationToken)
at Npgsql.NpgsqlDataReader.NextResult(Boolean async, Boolean isConsuming, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(Boolean async, CommandBehavior behavior, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteReader(Boolean async, CommandBehavior behavior, CancellationToken cancellationToken)
at Npgsql.NpgsqlCommand.ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Storage.RelationalCommand.ExecuteReaderAsync(RelationalCommandParameterObject parameterObject, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable`1.AsyncEnumerator.InitializeReaderAsync(AsyncEnumerator enumerator, CancellationToken cancellationToken)
at Npgsql.EntityFrameworkCore.PostgreSQL.Storage.Internal.NpgsqlExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func`4 operation, Func`4 verifySucceeded, CancellationToken cancellationToken)
Exception data:
Severity: ERROR
SqlState: 40001
MessageText: could not serialize access due to concurrent update
File: nodeLockRows.c
Line: 227
Routine: ExecLockRows
--- End of inner exception stack trace ---
at Npgsql.EntityFrameworkCore.PostgreSQL.Storage.Internal.NpgsqlExecutionStrategy.ExecuteAsync[TState,TResult](TState state, Func`4 operation, Func`4 verifySucceeded, CancellationToken cancellationToken)
at Microsoft.EntityFrameworkCore.Query.Internal.SingleQueryingEnumerable`1.AsyncEnumerator.MoveNextAsync()
Not sure what caused the problem, but after introducing a SagaDefinition it disappeared.
I changed this code:
configurator.AddSagaStateMachine<CalculationStateMachine, CalculationState>()
.Endpoint(cfg =>
{
const int concurrencyLimit = 5;
cfg.PrefetchCount = concurrencyLimit;
cfg.ConcurrentMessageLimit = concurrencyLimit;
})
.EntityFrameworkRepository(r =>
{
r.UsePostgres();
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<MyDbContext>();
});
To this, which is using a Partitioner:
configurator.AddSagaStateMachine<CalculationStateMachine, CalculationState, CalculationStateMachineSagaDefinition>()
// .Endpoint(cfg =>
// {
// const int concurrencyLimit = 5;
// cfg.PrefetchCount = concurrencyLimit;
// cfg.ConcurrentMessageLimit = concurrencyLimit;
// })
.EntityFrameworkRepository(r =>
{
r.UsePostgres();
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.ExistingDbContext<MyDbContext>();
});
public class CalculationStateMachineSagaDefinition : SagaDefinition<CalculationState> {
private const int ConcurrencyLimit = 5;
public CalculationStateMachineSagaDefinition()
{
Endpoint(e => { e.PrefetchCount = ConcurrencyLimit; });
}
protected override void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator,
ISagaConfigurator<CalculationState> sagaConfigurator,
IRegistrationContext context)
{
endpointConfigurator.UseMessageRetry(r => r.Intervals(100, 200, 500));
endpointConfigurator.UseInMemoryOutbox(context);
var partition = endpointConfigurator.CreatePartitioner(ConcurrencyLimit);
sagaConfigurator.Message<CalculateLogMessage>(s => s.UsePartitioner(partition, m => m.Message.CorrelationId));
sagaConfigurator.Message<CalculateFailedMessage>(s => s.UsePartitioner(partition, m => m.Message.CorrelationId));
sagaConfigurator.Message<CalculateSucceededMessage>(s => s.UsePartitioner(partition, m => m.Message.CorrelationId));
} }
Have yet to find out if I need to put all events into UsePartitioner or just the tree that may collide.