sessionservicebusazure-pack

Service Bus - Retrieve message from session by sequence number


I am currently trying to retrieve a specific message from a session.

To do so I use want to use the .Receive(Int64) on the MessageSession where I pass in the sequence number of the message.

Here is my code -

long msgSequenceNr = 1337;

QueueClient queueClient = QueueClient.CreateFromConnectionString(Constants.ServiceBusConnectionString, Constants.TestQueueEntityName, ReceiveMode.PeekLock);
MessageSession msgSession = queueClient.AcceptMessageSession(Constants.TestSessionId);

var peekedMsg = msgSession.Peek(msgSequenceNr); // <-- Works fine!
var receivedMsg = msgSession.Receive(msgSequenceNr); // <-- MessageNotFoundException

Unfortunately the Receive will result in a MessageNotFoundException while the Peek works fine. Is this a limitation that I missed or is there another way to achieve this.

Note that it is possible that there are multiple messages in the session


Solution

  • Receive with the SequenceNumber can only be used in combination with the Defer method. This is how you would implement it:

    1. Message received, but it can't be processed right now (maybe it's waiting for a different process to complete).
    2. Persist the SequenceNumber in some persistent storage (Table Storage, SQL Database, ...)
    3. When you know that processing can continue (eg: the dependent process is complete), load all SequenceNumbers from your persistent storage.
    4. Use Receive(int sequenceNumber) or ReceiveBatch(int[] sequenceNumbers) to received and process your deferred messages.

    Sample application: https://code.msdn.microsoft.com/windowsazure/Brokered-Messaging-ccc4f879#content

    Update:

    Form your comment I noticed that "undeferring" a deferred message could be a solution. Here's some sample code to undefer the message which copies the deferred message to a new message, Completes the deferred message and sends the new message back in the queue. This uses a TransactionScope to transactionally Complete and Resend the message to avoid the risk of losing the message:

        var messageId = "12434539828282";
    
        // Send.
        var msg = new BrokeredMessage {SessionId = "user1", MessageId = messageId };
        msg.Properties.Add("Language", "Dutch");
        queue.Send(msg);
    
        // Receive.
        var session = queue.AcceptMessageSession();
        msg = session.Receive();
    
        // Store the sequence number.
        var sequenceNumber = msg.SequenceNumber;
    
        // Defer.
        msg.Defer();
    
        // Change to true to test if the transaction worked.
        var shouldThrow = false;
    
        // Later processing of deferred message.
        msg = session.Receive(sequenceNumber);
    
        try
        {
            using (var ts = new TransactionScope())
            {
                // Create a new message.
                var undeferredMessage = new BrokeredMessage {SessionId = msg.SessionId, MessageId = msg.MessageId};
                foreach (var prop in msg.Properties)
                    undeferredMessage.Properties.Add(prop);
    
                // Complete and send within the same transaction.
                msg.Complete();
                if (shouldThrow)
                    throw new InvalidOperationException("Some error");
                queue.Send(undeferredMessage);
    
                // Complete the transaction.
                ts.Complete();
            }
        }
        catch (Exception ex)
        {
            msg.Abandon();
        }
    
        if (shouldThrow)
        {
            msg = session.Receive(sequenceNumber);
            Console.WriteLine(msg.MessageId + " should match: " + messageId);
        }
        else
        {
            try
            {
                msg = session.Receive(sequenceNumber);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Message not found, transaction worked OK.");
            }
        }
    

    Note: here I'm simply taking a copy of the Properties. Keep into account that you might want to copy the Body and any other additional information.