masstransitstate-machinesaga

Waiting for reponse from MassTransit Saga state machine


The MassTransit documentation for Saga State Machines (Respond section)(https://masstransit.io/documentation/patterns/saga/state-machine) mentions that it's possible to initiate a state machine instance and then wait until that instance finalises before responding to the original request.

There are scenarios where it is required to wait for the response from the state machine. In these scenarios the information that is required to respond to the original request should be stored.

The documentation then gives examples of how to setup the state machine instance with enough details of the original request in order to respond, as well as the sending of the response itself.

public record CreateOrder(Guid CorrelationId) : CorrelatedBy<Guid>;
public record ProcessOrder(Guid OrderId, Guid ProcessingId);
public record OrderProcessed(Guid OrderId, Guid ProcessingId);
public record OrderCancelled(Guid OrderId, string Reason);
public class ProcessOrderConsumer : IConsumer<ProcessOrder>
{
    public async Task Consume(ConsumeContext<ProcessOrder> context)
    {
        await context.RespondAsync(new OrderProcessed(context.Message.OrderId, context.Message.ProcessingId));
    }
}
public class OrderState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public Guid? ProcessingId { get; set; }
    public Guid? RequestId { get; set; }
    public Uri ResponseAddress { get; set; }
    public Guid OrderId { get; set; }
}
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    public State Created { get; set; }
    
    public State Cancelled { get; set; }
    
    public Event<CreateOrder> OrderSubmitted { get; set; }
    
    public Request<OrderState, ProcessOrder, OrderProcessed> ProcessOrder { get; set; }
    
    public OrderStateMachine()
    {
        InstanceState(m => m.CurrentState);
        Event(() => OrderSubmitted);
        Request(() => ProcessOrder, order => order.ProcessingId, config => { config.Timeout = TimeSpan.Zero; });
        Initially(
            When(OrderSubmitted)
                .Then(context =>
                {
                    context.Saga.CorrelationId = context.Message.CorrelationId;
                    context.Saga.ProcessingId = Guid.NewGuid();
                    context.Saga.OrderId = Guid.NewGuid();
                    context.Saga.RequestId = context.RequestId;
                    context.Saga.ResponseAddress = context.ResponseAddress;
                })
                .Request(ProcessOrder, context => new ProcessOrder(context.Saga.OrderId, context.Saga.ProcessingId!.Value))
                .TransitionTo(ProcessOrder.Pending));
        
        During(ProcessOrder.Pending,
            When(ProcessOrder.Completed)
                .TransitionTo(Created)
                .ThenAsync(async context =>
                {
                    var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                    await endpoint.Send(context.Saga, r => r.RequestId = context.Saga.RequestId);
                }),
            When(ProcessOrder.Faulted)
                .TransitionTo(Cancelled)
                .ThenAsync(async context =>
                {
                    var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                    await endpoint.Send(new OrderCancelled(context.Saga.OrderId, "Faulted"), r => r.RequestId = context.Saga.RequestId);
                }),
            When(ProcessOrder.TimeoutExpired)
                .TransitionTo(Cancelled)
                .ThenAsync(async context =>
                {
                    var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                    await endpoint.Send(new OrderCancelled(context.Saga.OrderId, "Time-out"), r => r.RequestId = context.Saga.RequestId);
                }));
    }
}

There are no examples of how allow the client to initiate and wait for the state machine and this is what I am uncertain about.

After a lot of changes I am currently sending the request from the client like

var correlationId = NewId.NextGuid();

var response = await _client.GetResponse<MyResponse>(new OrderSubmitted
            {
                CorrelationId = correlationId,
            }, cancellationToken)

which is successfully creating the state machine instance and is waiting for the response. The state machine carries out it's activities and then attempts to respond to the client:

.ThenAsync(async context =>
            {
                var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                await endpoint.Send(context.Saga, r => r.RequestId = context.Saga.RequestId);
            })

but the client is left waiting and eventually times-out.

I can't seem to get the request and the response to tie-in together.


Solution

  • Instead of using .ThenAsync to send the response, you should use the saga state machines built-in .SendAsync method as shown below:

    .SendAsync(context => context.Saga.ResponseAddress,
        context => new MyResponse
        {
            Value = context.Saga.SavedValue
        }), (context, sendContext) => sendContext.RequestId = context.Saga.RequestId)
    

    Also, in the code you posted, you were responding with the saga instance, instead of MyResponse.