entity-framework-coreesbmasstransitsagadevart

Masstransit EntityFramework Saga management


I have been trying to use the Devart Postgres EFCore library as this is used at the company I work at. The NpgSql lib works fine with Mass Transit from what I can tell.

However I almost immediately ran into a problem with Mass Transit and Devart.

Devart.Data.PostgreSql.PgSqlException (0x80004005): column "p0" does not exist

I tracked this down to the PostgresLockStatementFormatter.AppendColumn method.

https://github.com/MassTransit/MassTransit/blob/develop/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/PostgresLockStatementFormatter.cs

Masstransit uses this to help build a sql statement which is executed as RawSql by the DbContext. This is basically the statement:

var queryable = dbContext.Set<MyEntity>().FromSqlRaw(" SELECT * FROM \"my_entity\" WHERE \"correlation_id\" = :p0 FOR UPDATE", Guid.NewGuid());

It seems that the Devart libs don't like the use of parameters named @p0

The correct format for Devart is :p0

My approach was to replace the LockStatmentFormatter method with my own implementation specific to Devart:

public class DevArtPostgresLockStatementFormatter : ILockStatementFormatter
{
    public void Create(StringBuilder sb, string schema, string table)
    {
        sb.AppendFormat("SELECT * FROM {0} WHERE ", FormatTableName(schema, table));
    }

    public void AppendColumn(StringBuilder sb, int index, string columnName)
    {
        if (index == 0)
            sb.AppendFormat("\"{0}\" = :p0", columnName);
        else
            sb.AppendFormat(" AND \"{0}\" = :p{1}", columnName, index);
    }

    public void Complete(StringBuilder sb)
    {
        sb.Append(" FOR UPDATE");
    }

    public void CreateOutboxStatement(StringBuilder sb, string schema, string table, string columnName)
    {
        sb.AppendFormat(@"SELECT * FROM {0} ORDER BY ""{1}"" LIMIT 1 FOR UPDATE SKIP LOCKED", FormatTableName(schema, table), columnName);
    }

    static string FormatTableName(string schema, string table)
    {
        return string.IsNullOrEmpty(schema) ? $"\"{table}\"" : $"\"{schema}\".\"{table}\"";
    }
}

This has to be placed in the service config section:

 x.AddSagaRepository<MyStateMachine>()
      .EntityFrameworkRepository(r =>
      {                    
           r.LockStatementProvider = new DevArtPostgresLockStatementProvider();
           r.ExistingDbContext<MyDbContext>();
      });

So this seems to fix my issue. I wonder is there anything else I should be aware of where it's also used.

This was my only real find in the durable Outbox feature I think?

https://github.com/MassTransit/MassTransit/blob/develop/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/EntityFrameworkOutboxContextFactory.cs

I can test this but thought it was worth asking the question incase anyone has similar issues. Or perhaps it can help someone.

Thanks


Solution

  • As you have found by searching the codebase, the lock statement provider is used by both the saga repository and transactional outbox. It's the only dialect-specific component, and the right place to tweak the SQL for different providers.