.netazureazureservicebusazure-servicebus-queuesazure-servicebus-topics

ServiceBusRetryOptions not working with SessionProcessor


I have created a service which starts and creates a connection to given Azure Service Bus Topic. I am using Azure.Messaging.ServiceBus Sdk with .NET 7 and I am trying to implement retry policy but it seems like the MaxDelivery is the only thing that is contributing to retries of processing a message.

Here, I have thrown an exception to see the count of retries which I expected to be 2 but turns out to be 10 (i.e DeliveryCount = 10). This seems to be the same as what I set in MaxDeliveryCount (10) in Azure Portal. Here is the code:

using Azure.Messaging.ServiceBus;

namespace ServiceBusProcessor;
public class ServiceBusProcessorHandler : BackgroundService
{
    private static readonly string ConnectionString = "";
    private static readonly string TopicName = "";
    private static readonly string SubscriptionName = "";
    private readonly ServiceBusClient serviceBusClient;
    private readonly ServiceBusSessionProcessor serviceBusSessionProcessor;
    public ServiceBusProcessorHandler()
    {
        this.serviceBusClient = new ServiceBusClient(ConnectionString, new ServiceBusClientOptions()
        {
            RetryOptions = new ServiceBusRetryOptions()
            {
                Mode = ServiceBusRetryMode.Exponential,
                MaxRetries = 2,
                MaxDelay = TimeSpan.FromMilliseconds(10000),
                Delay = TimeSpan.FromMilliseconds(1000)
            }
        });
        this.serviceBusSessionProcessor = this.serviceBusClient.CreateSessionProcessor(TopicName, SubscriptionName, new ServiceBusSessionProcessorOptions()
        {
            MaxConcurrentSessions = 100,
            PrefetchCount = 100,
            SessionIdleTimeout = TimeSpan.FromSeconds(1),
        });
    }
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        this.serviceBusSessionProcessor.ProcessMessageAsync += (args) =>
    {
            Console.WriteLine($"DeliveryCount: {args.Message.DeliveryCount}");
            Console.WriteLine(args.Message.Body);
            throw new NotImplementedException("Not Implemented");
        };

        this.serviceBusSessionProcessor.ProcessErrorAsync += (args) =>
        {
            Console.WriteLine(args.Exception);
            return Task.CompletedTask;
        };

        await this.serviceBusSessionProcessor.StartProcessingAsync();
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        Console.WriteLine("Service is stopping.");
        await base.StopAsync(cancellationToken);
    }
}

My use case is simple, try to retry without Scheduling the message (or republishing to the service bus) in case there is an internal server error.


Solution

  • I tried the following code to set the maximum retry count to 2, with an exception, when sending a message to the Azure ServiceBus Topic.

    Code :

    using Azure.Messaging.ServiceBus;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Polly;
    using Polly.Retry;
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using ServiceBusProcessor = Azure.Messaging.ServiceBus.ServiceBusProcessor;
    
    namespace MyServiceBusProcessorApp 
    {
        public class ServiceBusProcessorHandler : BackgroundService
        {
            private static readonly string ConnectionString = "<ServiceBusTopic_connec_string>";
            private static readonly string TopicName = "<Topic_name>";
            private static readonly string SubscriptionName = "<TopicSubscription_name";
            private readonly ServiceBusClient serviceBusClient;
            private readonly ServiceBusProcessor serviceBusProcessor; 
            private readonly AsyncRetryPolicy retryPolicy;
    
            public ServiceBusProcessorHandler()
            {
                this.serviceBusClient = new ServiceBusClient(ConnectionString, new ServiceBusClientOptions()
                {
                    RetryOptions = new ServiceBusRetryOptions()
                    {
                        Mode = ServiceBusRetryMode.Exponential,
                        MaxRetries = 2,
                        MaxDelay = TimeSpan.FromMilliseconds(10000),
                        Delay = TimeSpan.FromMilliseconds(1000)
                    }
                });
                this.serviceBusProcessor = this.serviceBusClient.CreateProcessor(TopicName, SubscriptionName, new ServiceBusProcessorOptions()
                {
                    MaxConcurrentCalls = 100,
                    PrefetchCount = 100,
                });
                this.retryPolicy = Policy.Handle<Exception>()
                                         .WaitAndRetryAsync(2, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                                            (exception, timeSpan, retryCount, context) =>
                                            {
                                                Console.WriteLine($"Retry {retryCount} encountered an error: {exception.Message}. Waiting {timeSpan} before next retry.");
                                            });
            }
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                this.serviceBusProcessor.ProcessMessageAsync += async (args) =>
                {
                    try
                    {
                        await this.retryPolicy.ExecuteAsync(async () =>
                        {
                            Console.WriteLine($"DeliveryCount: {args.Message.DeliveryCount}");
                            Console.WriteLine(args.Message.Body);
                            throw new NotImplementedException("Not Implemented");
                        });
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine($"Failed after retries: {ex.Message}");
                    }
                };
                this.serviceBusProcessor.ProcessErrorAsync += (args) =>
                {
                    Console.WriteLine(args.Exception);
                    return Task.CompletedTask;
                };
    
                await this.serviceBusProcessor.StartProcessingAsync();
            }
            public override async Task StopAsync(CancellationToken cancellationToken)
            {
                Console.WriteLine("Service is stopping.");
                await this.serviceBusProcessor.StopProcessingAsync();
                await base.StopAsync(cancellationToken);
            }
        }
        public class Program
        {
            public static void Main(string[] args)
            {
                CreateHostBuilder(args).Build().Run();
            }
    
            public static IHostBuilder CreateHostBuilder(string[] args) =>
                Host.CreateDefaultBuilder(args)
                    .ConfigureServices((hostContext, services) =>
                    {
                        services.AddHostedService<ServiceBusProcessorHandler>();
                    });
        }
    }
    

    Output :

    I sent a message to the Azure ServiceBus Topic as shown below:

    enter image description here

    I received the following message in the console.

    enter image description here