I have created a service which starts and creates a connection to given Azure Service Bus Topic. I am using Azure.Messaging.ServiceBus Sdk with .NET 7 and I am trying to implement retry policy but it seems like the MaxDelivery is the only thing that is contributing to retries of processing a message.
Here, I have thrown an exception to see the count of retries which I expected to be 2 but turns out to be 10 (i.e DeliveryCount = 10). This seems to be the same as what I set in MaxDeliveryCount (10) in Azure Portal. Here is the code:
using Azure.Messaging.ServiceBus;
namespace ServiceBusProcessor;
public class ServiceBusProcessorHandler : BackgroundService
{
private static readonly string ConnectionString = "";
private static readonly string TopicName = "";
private static readonly string SubscriptionName = "";
private readonly ServiceBusClient serviceBusClient;
private readonly ServiceBusSessionProcessor serviceBusSessionProcessor;
public ServiceBusProcessorHandler()
{
this.serviceBusClient = new ServiceBusClient(ConnectionString, new ServiceBusClientOptions()
{
RetryOptions = new ServiceBusRetryOptions()
{
Mode = ServiceBusRetryMode.Exponential,
MaxRetries = 2,
MaxDelay = TimeSpan.FromMilliseconds(10000),
Delay = TimeSpan.FromMilliseconds(1000)
}
});
this.serviceBusSessionProcessor = this.serviceBusClient.CreateSessionProcessor(TopicName, SubscriptionName, new ServiceBusSessionProcessorOptions()
{
MaxConcurrentSessions = 100,
PrefetchCount = 100,
SessionIdleTimeout = TimeSpan.FromSeconds(1),
});
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
this.serviceBusSessionProcessor.ProcessMessageAsync += (args) =>
{
Console.WriteLine($"DeliveryCount: {args.Message.DeliveryCount}");
Console.WriteLine(args.Message.Body);
throw new NotImplementedException("Not Implemented");
};
this.serviceBusSessionProcessor.ProcessErrorAsync += (args) =>
{
Console.WriteLine(args.Exception);
return Task.CompletedTask;
};
await this.serviceBusSessionProcessor.StartProcessingAsync();
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
Console.WriteLine("Service is stopping.");
await base.StopAsync(cancellationToken);
}
}
My use case is simple, try to retry without Scheduling the message (or republishing to the service bus) in case there is an internal server error.
I tried the following code to set the maximum retry count to 2, with an exception, when sending a message to the Azure ServiceBus Topic.
Code :
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Polly;
using Polly.Retry;
using System;
using System.Threading;
using System.Threading.Tasks;
using ServiceBusProcessor = Azure.Messaging.ServiceBus.ServiceBusProcessor;
namespace MyServiceBusProcessorApp
{
public class ServiceBusProcessorHandler : BackgroundService
{
private static readonly string ConnectionString = "<ServiceBusTopic_connec_string>";
private static readonly string TopicName = "<Topic_name>";
private static readonly string SubscriptionName = "<TopicSubscription_name";
private readonly ServiceBusClient serviceBusClient;
private readonly ServiceBusProcessor serviceBusProcessor;
private readonly AsyncRetryPolicy retryPolicy;
public ServiceBusProcessorHandler()
{
this.serviceBusClient = new ServiceBusClient(ConnectionString, new ServiceBusClientOptions()
{
RetryOptions = new ServiceBusRetryOptions()
{
Mode = ServiceBusRetryMode.Exponential,
MaxRetries = 2,
MaxDelay = TimeSpan.FromMilliseconds(10000),
Delay = TimeSpan.FromMilliseconds(1000)
}
});
this.serviceBusProcessor = this.serviceBusClient.CreateProcessor(TopicName, SubscriptionName, new ServiceBusProcessorOptions()
{
MaxConcurrentCalls = 100,
PrefetchCount = 100,
});
this.retryPolicy = Policy.Handle<Exception>()
.WaitAndRetryAsync(2, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, timeSpan, retryCount, context) =>
{
Console.WriteLine($"Retry {retryCount} encountered an error: {exception.Message}. Waiting {timeSpan} before next retry.");
});
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
this.serviceBusProcessor.ProcessMessageAsync += async (args) =>
{
try
{
await this.retryPolicy.ExecuteAsync(async () =>
{
Console.WriteLine($"DeliveryCount: {args.Message.DeliveryCount}");
Console.WriteLine(args.Message.Body);
throw new NotImplementedException("Not Implemented");
});
}
catch (Exception ex)
{
Console.WriteLine($"Failed after retries: {ex.Message}");
}
};
this.serviceBusProcessor.ProcessErrorAsync += (args) =>
{
Console.WriteLine(args.Exception);
return Task.CompletedTask;
};
await this.serviceBusProcessor.StartProcessingAsync();
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
Console.WriteLine("Service is stopping.");
await this.serviceBusProcessor.StopProcessingAsync();
await base.StopAsync(cancellationToken);
}
}
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddHostedService<ServiceBusProcessorHandler>();
});
}
}
Output :
I sent a message to the Azure ServiceBus Topic as shown below:
I received the following message in the console.