.netazureazureservicebusmasstransitdead-letter

.NET & Mass Transit - How to process messages in DLQ


I have an application in .NET and I use Mass Transit. In Azure I am running Azure Service Bus.

The following situation is happening to me. It happens that I get an exception in the consumer, for example, an unavailable database. It crashes into the Mass transit error queue on that I have a consumer too, however it crashes into the unavailable database there too. And even if I have retry set there, it doesn't get resolved. So the message goes to my DLQ. What now? I actually don't even care that it fails. The problem is that it doesn't flag something in the error queue in the database, and because of it my whole application gets “stuck”.

I don't know how to handle this situation. Should I write some routine that checks my DLQ and processes those messages? Or is there already something from Mass Transsit ready for this?

Thanks a lot


Solution

  • You can create a separate MassTransit receive endpoint or background service that listens to the DLQ directly and processes or logs those messages. Program.cs

           {
                        services.AddMassTransit(x =>
                        {
                            x.AddConsumer<MyMessageConsumer>();
    
                            x.UsingAzureServiceBus((context, cfg) =>
                            {
                                cfg.Host("your-connection-string", h =>
                                {
                                    h.TransportType = Microsoft.Azure.ServiceBus.TransportType.AmqpWebSockets;
                                });
    
                                cfg.ConfigureEndpoints(context);
                            });
                        });
    
                        services.AddHostedService<DeadLetterProcessorService>();
                    })
                    .Build()
                    .RunAsync();
            }
    
    public class MyMessage
    {
        public string Value { get; set; }
    }
    
    public class MyMessageConsumer : IConsumer<MyMessage>
    {
        public async Task Consume(ConsumeContext<MyMessage> context)
        {
            Console.WriteLine($"Received message: {context.Message.Value}");
    
            throw new Exception("Simulated database failure");
        }
    }
    
    

    DLQ Listener Service

    using Azure.Messaging.ServiceBus;
    using Microsoft.Extensions.Hosting;
    using System;
    using System.Text.Json;
    using System.Threading;
    using System.Threading.Tasks;
    
    public class DeadLetterProcessorService : BackgroundService
    {
        private readonly string connectionString = "your-connection-string";
        private readonly string queueName = "your-queue-name"; // No "-$DeadLetterQueue" here
    
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var client = new ServiceBusClient(connectionString);
            var receiver = client.CreateReceiver(
                queueName,
                new ServiceBusReceiverOptions
                {
                    SubQueue = SubQueue.DeadLetter
                });
    
            while (!stoppingToken.IsCancellationRequested)
            {
                var messages = await receiver.ReceiveMessagesAsync(maxMessages: 10, TimeSpan.FromSeconds(5), stoppingToken);
    
                foreach (var message in messages)
                {
                    try
                    {
                        string body = message.Body.ToString();
                        Console.WriteLine($"[DLQ] Received dead-lettered message: {body}");
    
                        await receiver.CompleteMessageAsync(message, stoppingToken);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"[DLQ] Failed to process message: {ex.Message}");
                    }
                }
    
                await Task.Delay(5000, stoppingToken);
            }
        }
    }
    
    

    Output