asp.net-coremessage-queueazureservicebusmessagingazure-servicebus-queues

Azure.Messaging.ServiceBus Error when completing or abandoning message


So I'm currently looking into updating our very simple service bus service to the latest version (Azure.Messaging.Servicebus) and I'm running into a small problem here.

The thing is I want to complete or abandon received or peaked messages manually by delegating the message back to methods in my service class to handle the job.

Here is my simple class so far, exposed by an interface.

using myProject.Interfaces;
using myProject.Utilities;
using Azure.Messaging.ServiceBus;
using System;

namespace myProject.Services
{
    /// <summary>
    /// Service bus service controller
    /// </summary>
    public class ServiceBusService: IServiceBusService
    {
        private IConfigurationUtility _configurationUtility;
        static string connectionString;
        static ServiceBusClient client;
        static ServiceBusSender sender;
        static ServiceBusReceiver receiver;

        public ServiceBusService(IConfigurationUtility configurationUtility)
        {
            _configurationUtility = configurationUtility;
            connectionString = _configurationUtility.GetSetting("ServiceBusConnectionString");
            client = new ServiceBusClient(connectionString);
        }

        /// <summary>
        /// Sending message.
        /// </summary>
        /// <param name="messageContent"></param>
        /// <param name="queueName"></param>
        public void SendMessage(string messageContent, string queueName)
        {
            sender = client.CreateSender(queueName);
            ServiceBusMessage message = new ServiceBusMessage(messageContent);
            sender.SendMessageAsync(message).Wait();
        }

        /// <summary>
        /// Receive message.
        /// </summary>
        /// <returns></returns>
        /// <param name="queueName"></param>
        public ServiceBusReceivedMessage ReceiveMessage(string queueName)
        {
            receiver = client.CreateReceiver(queueName);
            ServiceBusReceivedMessage receivedMessage = receiver.ReceiveMessageAsync().Result;
            return receivedMessage;
        }

        /// <summary>
        /// Peek message.
        /// </summary>
        /// <returns></returns>
        public ServiceBusReceivedMessage PeekMessage(string queueName)
        {
            receiver = client.CreateReceiver(queueName);
            ServiceBusReceivedMessage peekedMessage = receiver.PeekMessageAsync().Result;
            return peekedMessage;
        }

        /// <summary>
        /// Complete message.
        /// </summary>
        /// <param name="message"></param>
        public void CompleteMessage(ServiceBusReceivedMessage message)
        {
            receiver.CompleteMessageAsync(message).Wait();
        }

        /// <summary>
        /// Abandon message.
        /// </summary>
        /// <param name="message"></param>
        public void AbandonMessage(ServiceBusReceivedMessage message)
        {
            receiver.AbandonMessageAsync(message).Wait();
        }
    }
}

Everything works well when I'm handling one message at a time, but if I would process two messages at a time for example I get this error:

The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance

And I think the "or was received by a different receiver instance" part is my problem.

Am I thinking about this the wrong way? shouldn't I be able to handle for example: abandoning multiple single received messages at a time?

Thanks in advance.

UPDATE:

So I think I managed to get it working in regards to what Jesse Squire have been suggested to do in reflection to the new Servicebus documentation provided by Microsoft.

I created a new class called ServicebusFactory, added the code provided by Jesse below, to test it out and changed the "= new();" part for the initialized "ConcurrentDictionary's" at the top of the example, as initializing them like this resulted in the error below:

CS8400 Feature 'target-typed object creation' is not available in C# 8.0. Please use language version 9.0 or greater.

Result:

public class ServiceBusFactory : IAsyncDisposable, IServiceBusFactory
{
    private readonly ServiceBusClient _client;
    private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new ConcurrentDictionary<string, ServiceBusSender>();
    private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new ConcurrentDictionary<string, ServiceBusReceiver>();

    public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
    public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);

    public ServiceBusSender GetSender(string entity) =>
        _senders.GetOrAdd(entity, entity => _client.CreateSender(entity));

    public ServiceBusReceiver GetReceiver(string entity) =>
        _receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));

    public async ValueTask DisposeAsync()
    {
        await _client.DisposeAsync().ConfigureAwait(false);
        GC.SuppressFinalize(this);
    }
}

I then created an interface for my service bus factory and added this as a singleton to the bottom of my ConfigurationService method in my Startup.cs class, initializing it with my service bus connection string.

Interface:

public interface IServiceBusFactory
{
   public ServiceBusSender GetSender(string entity);
   public ServiceBusReceiver GetReceiver(string entity);
   public ValueTask DisposeAsync();
}

Startup:

var serviceBusConnectionString = "[CONNECTION STRING]";
services.AddSingleton<IServiceBusFactory>(new ServiceBusFactory(serviceBusConnectionString));

Finally it was a matter of dependency inject the servicebusFactory interface to my controller constructor and then use it to get a "sender" and using it to send a message to my queue called "Add".

Constructor:

private readonly IServiceBusFactory _serviceBusFactory;

public ServicebusController(IServiceBusFactory serviceBusFactory) 
{
    _serviceBusFactory = serviceBusFactory; 
}

Controller Method/Action implementation:

var test = new List<ServiceBusMessage>();
test.Add(new ServiceBusMessage("This is a test message."));

var sender = _serviceBusFactory.GetSender("Add");
sender.SendMessagesAsync(test);

Solution

  • Service Bus associates a message lock with the AMQP link from which the message was received. For the SDK, this means that you must settle the message with the same ServiceBusReceiver instance that you used to receive it.

    In your code, you're creating a new receiver for each ReceiveMessage call - so when you attempt to complete or abandon the message, you're using a link for which the message is not valid if any other call to ReceiveMessage has taken place.

    Generally, you want to avoid the pattern of creating short-lived Service Bus client objects. They're intended to be long-lived and reused over the lifetime of the application. In your code, you're also implicitly abandoning the senders/receivers without closing them. This is going to orphan the AMQP link until the service force-closes it for being idle after 20 minutes.

    I'd recommend pooling your senders/receivers and keeping each as a singleton for the associated queue. Each call to SendMessage or ReceiveMessage should for a given queue should use the same sender/receiver instance.

    When your application closes, be sure to close or dispose the ServiceBusClient, which will ensure that all of its child senders/receivers are also cleaned up appropriately.

    I'd also very strongly recommend refactoring your class to be async. The sync-over-async pattern that you're using is going to put additional pressure on the thread pool and is likely to result in thread starvation and/or deadlocks under load.

    UPDATE

    To add some additional context, I'd advise not wrapping Service Bus operations but, instead, have a factory that focuses on managing clients and letting callers interact directly with them.

    This ensures that clients are pooled and their lifetimes are managed correctly, while also giving flexibility to callers to hold onto the sender/receiver reference and use for multiple operations rather than paying the cost to retrieve it.

    As an example, the following is a simple factory class that you'd create and manage as a singleton in your application. Callers are able to request a sender/receiver for a specific queue/topic/subscription and they'll be created as needed and then pooled for reuse.

    // This class is intended to be treated as a singleton for most 
    // scenarios.  Each instance created will open an independent connection
    // to the Service Bus namespace, shared by senders and receivers spawned from it.
    
    public class ServiceBusFactory : IAsyncDisposable
    {
        // If throughput needs scale beyond a single connection, the factory can
        // manage multiple clients and ensure that child entities are evenly distributed
        // among them.
        
        private readonly ServiceBusClient _client;
        private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new();
        private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new();
    
        public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
        public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);
    
        public ServiceBusSender GetSender(string entity) =>
             _senders.GetOrAdd(entity, entity => _client.CreateSender(entity));
    
        public ServiceBusReceiver GetReceiver(string entity) =>
            _receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));
    
        public async ValueTask DisposeAsync()
        {
            await _client.DisposeAsync().ConfigureAwait(false);
            GC.SuppressFinalize(this);
        }
    }