azureservicebusrebusazure-servicebus-topicsrebus-azureservicebus

Rebus listening to pure azure servicebus


In my scenario, I've implemented Rebus (usings azure servicebus) in my application (going to make a PoC using sagas).

Problem is, the topic I'm listening to is azure servicebus only, no fancy pancy.

I'm subscribing to the topic, moving the message to my queue and getting this error:

Rebus.Exceptions.RebusApplicationException: Received message with empty or absent 'rbs2-msg-id' header! All messages must be supplied with an ID . If no ID is present, the message cannot be tracked between delivery attempts, and other stuff would also be much harder to do - therefore, it is a requirement that messages be supplied with an ID.

Is there a way to decorate the message before it hits the queue, to transform it into something that Rebus would accept?

Or do I have to have a separate bus for handling those messages and re-send them as a Rebus compliant message to a topic/queue?


Solution

  • That error message comes from the SimpleRetryStrategyStep, which is usually the first step executed in Rebus' incoming messages pipeline.

    One option for you would be to decorate ITransport in the receiver's Rebus instance, which would provide a place for you to provide a message ID. It can be done somewhat like this:

    Configure.With(...)
        .Transport(t => {
            t.UseAzureServiceBus(...);
    
            t.Decorate(c => new MyTransportDecorator(t.Get<ITransport>()))
        })
        .Start();
    

    where MyTransportDecorator is then a decorator that could look like this:

    class MyTransportDecorator : ITransport
    {
        readonly ITransport _transport;
    
        public MyTransportDecorator(ITransport transport) => _transport = transport;
    
        public void CreateQueue(string address) => _transport.CreateQueue(address);
    
        public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context)
            => _transport.Send(destinationAddress, message, context);
    
        public async Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
        {
            var message = await _transport.Receive(context, cancellationToken);
    
            if (message == null) return null;
    
            ProvideMessageIdSomehow(message);
    
            return message;
        }
    
        public string Address => _transport.Address;
    }
    

    where ProvideMessageIdSomehow then adds the required header.