.netapache-kafkamasstransitsaga

Catch and handle exception thrown by consumer inside the Saga


I'm working on a POC that uses Kafka and applies the Saga pattern. I was abble to create a happy path in which I can consume messages from a topic and send it to multiple services through other Kafka topics. Now I'm trying to handle exceptions during the consuption of a message.

As you can see, I have a simple consumer of the type OrderRequestEvent. This consumer receives a message and then validates it with this apiService. If the validation fails, it should throw an exception. My idea is to catch/handle this exception inside the saga, change the state and send it to another topic for possible retries, without finalizing the saga.

Here goes my consumer:

  public class OrderManagementSystemConsumer : IConsumer<OrderRequestEvent>
    {
        private readonly ITopicProducer<CustomerValidationRequestEvent> customerValidationResponseEvent;
        private readonly IApiService apiService;

        public OrderManagementSystemConsumer(
            ITopicProducer<CustomerValidationRequestEvent> customerValidationResponseEvent, 
            IApiService apiService)
        {
            this.customerValidationResponseEvent = customerValidationResponseEvent;
            this.apiService = apiService;
        }

        public async Task Consume(ConsumeContext<OrderRequestEvent> context)
        {
            ArgumentNullException.ThrowIfNull(context, nameof(context));

            if (await this.apiService.ValidateIncomingRequestAsync(context.Message))
                throw new ArgumentException("Something wrong just happened");
        }
    }

Part of my saga which handles the OrderRequestEvent

 Initially(
            When(OrderRequestedEvent)
                .Then(context => LogContext.Info?.Log("Initializing saga: {0}", context.Saga.CorrelationId))
                .InitializeSaga()
                .Then(context => LogContext.Info?.Log("Validating Customer: {0}", context.Saga.CorrelationId))
                .SendingToCustomerValidation().LogSaga()
                .TransitionTo(ValidatingCustomer));

I noticed that even though my exception is properly thrown inside the consumer, the saga continues normally. Apparently, exceptions thrown in the consumer do not have any effect on the saga.


Solution

  • Simplistically, that is correct. Without some form of work on your end Consumers are not "connected" to Sagas. The way that I'm reading your code is that you have a saga processing an event (the OrderRequestEvent) and a consumer than is getting its own copy of the event and doing its thing. This is the system working as intended.

    If you want these connected, then you'll need to add some logic to the Saga. The least amount of moving parts would be to build a Custom Saga Activity that would contain the same logic as the Consumer. The success or failure of that Activity would then follow the existing Saga logic.

    Generally, you want saga activities to be fast (on the order of an HTTP request), so if the validation needs to depend on a 3rd party service I'd suggest having the saga delegate that to a consumer. This is closer to what you have in the initial question. In this case, you would follow the Saga Requests Docs and have the saga issue a Request/Response that your consumer would need to process and then respond Request/Response Docs. In your existing model you are simply throwing an Exception, this will get sent back to the Requestor as a fault, so you'll want to handle that in your Saga.

    As for the code you posted, it's entirely possible you are trying to do this already, but the code is hidden behind the methods like SendingToCustomerValidation and LogSaga.

    In the end, this is all supported by MT and used regularly. So if it is not working, its almost ALWAYS a configuration issue. Make sure to turn up the logs to debug and see what is going on. :)