asp.netazureazureservicebusazure-servicebus-topicsasp.net-core-hosted-services

Azure Service Bus not all messages received in hosted service web app


Inside a .net web app, I set up a hosted service to receive messages from an Azure Service Bus topic. The problem is that not all messages are received, only an arbitrary amount (e.g. of 20 messages only 12 are received). The rest of them ended up in the dead letter queue. This happens when the messages are send simultaneously. I tried the following steps to solve this:

  1. Increased the amount of maximum concurrent calls, which helped but didn't provide a guarantee
  2. Added a prefetch count

I also tried to send messages via the functionality in the service bus resource in Azure. 500 messages, no interval time --> didn't work (for all messages). 500 messages, 1s interval time, all messages were received.

I just don't understand why the receiver is not recieving all of the messages. I want to build a event-driven architecture and cannot make it a gamble if all messages will be processed.

Startup.cs

...
public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton<IServiceBusTopicSubscription,ServiceBusSubscription>();
    services.AddHostedService<WorkerServiceBus>();
}
...
        

WorkerService.cs

public class WorkerServiceBus : IHostedService, IDisposable
{
    private readonly ILogger<WorkerServiceBus> _logger;
    private readonly IServiceBusTopicSubscription _serviceBusTopicSubscription;




    public WorkerServiceBus(IServiceBusTopicSubscription serviceBusTopicSubscription,
        ILogger<WorkerServiceBus> logger)
    {
        _serviceBusTopicSubscription = serviceBusTopicSubscription;
        _logger = logger;
    }



   public async Task StartAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Starting the service bus queue consumer and the subscription");
        await _serviceBusTopicSubscription.PrepareFiltersAndHandleMessages().ConfigureAwait(false);
    }



   public async Task StopAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Stopping the service bus queue consumer and the subscription");
        await _serviceBusTopicSubscription.CloseSubscriptionAsync().ConfigureAwait(false);
    }



   public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }



   protected virtual async void Dispose(bool disposing)
    {
        if (disposing)
        {
            await _serviceBusTopicSubscription.DisposeAsync().ConfigureAwait(false);
        }
    }
}

ServiceBusSubscription.cs

public class ServiceBusSubscription : IServiceBusTopicSubscription
{
    private readonly IConfiguration _configuration;
    private const string TOPIC_PATH = "test";
    private const string SUBSCRIPTION_NAME = "test-subscriber";
    private readonly ILogger _logger;
    private readonly ServiceBusClient _client;
    private readonly IServiceScopeFactory _scopeFactory;
    private ServiceBusProcessor _processor;



   public ServiceBusBookingsSubscription(IConfiguration configuration,
        ILogger<ServiceBusBookingsSubscription> logger,
        IServiceScopeFactory scopeFactory)
    {
        _configuration = configuration;
        _logger = logger;
        _scopeFactory = scopeFactory;



       var connectionString = _configuration.GetConnectionString("ServiceBus");
        var serviceBusOptions = new ServiceBusClientOptions()
        {
            TransportType = ServiceBusTransportType.AmqpWebSockets
        };
        _client = new ServiceBusClient(connectionString, serviceBusOptions);
    }



   public async Task PrepareFiltersAndHandleMessages()
    {
        ServiceBusProcessorOptions _serviceBusProcessorOptions = new ServiceBusProcessorOptions
        {
            MaxConcurrentCalls = 200,
            AutoCompleteMessages = false,
            PrefetchCount = 1000,
        };



       _processor = _client.CreateProcessor(TOPIC_PATH, SUBSCRIPTION_NAME, _serviceBusProcessorOptions);
        _processor.ProcessMessageAsync += ProcessMessagesAsync;
        _processor.ProcessErrorAsync += ProcessErrorAsync;



       await _processor.StartProcessingAsync().ConfigureAwait(false);
    }



   private async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
    {
        _logger.LogInformation($"Received message from service bus");
        _logger.LogInformation($"Message: {args.Message.Body}");
        var payload = args.Message.Body.ToObjectFromJson<List<SchedulerBookingViewModel>>();



       // Create scoped dbcontext
        using var scope = _scopeFactory.CreateScope();
        var dbContext = scope.ServiceProvider.GetRequiredService<dbContext>();



       // Process payload
        await new TestServiceBus().DoThings(payload);



       await args.CompleteMessageAsync(args.Message).ConfigureAwait(false);
    }



   private Task ProcessErrorAsync(ProcessErrorEventArgs arg)
    {
        _logger.LogError(arg.Exception, "Message handler encountered an exception");
        _logger.LogError($"- ErrorSource: {arg.ErrorSource}");
        _logger.LogError($"- Entity Path: {arg.EntityPath}");
        _logger.LogError($"- FullyQualifiedNamespace: {arg.FullyQualifiedNamespace}");



       return Task.CompletedTask;
    }



   public async ValueTask DisposeAsync()
    {
        if (_processor != null)
        {
            await _processor.DisposeAsync().ConfigureAwait(false);
        }



       if (_client != null)
        {
            await _client.DisposeAsync().ConfigureAwait(false);
        }
    }



   public async Task CloseSubscriptionAsync()
    {
        await _processor.CloseAsync().ConfigureAwait(false);
    }
}

Solution

  • So this is how we solved the problem. It was related to the message lock duration, which is set for the Azure Resource in the portal. Previous Value: 30s. New Value: 3min.