rabbitmqmasstransit

Two MassTransit consumers consuming the same request at the same time?


I know this should not be possible, so I'm thinking we configured something terribly wrong. We were getting 409(Conflict) errors from a 3rd party API we are using and trying to test locally, if I put a breakpoint in the consume method of two instances of our worker application that contains our masstransit consumers, for one request put on the bus, both consume methods get called at the same time. The two local workers are running on completely different code (two clones of the same repo) and RabbitMQ shows two separate registered workers along with the single bus.

We add our consumers like so:

public static void ConfigureWorkerServices(IServiceCollection services, IConfiguration configuration){

services.AddOptions<MassTransitHostOptions>()
    .Configure(options =>
    {
        // if specified, waits until the bus is started before
        // returning from IHostedService.StartAsync
        // default is false
        options.WaitUntilStarted = true;

        // if specified, limits the wait time when starting the bus
        options.StartTimeout = TimeSpan.FromSeconds(10);

        // if specified, limits the wait time when stopping the bus
        options.StopTimeout = TimeSpan.FromSeconds(30);
    });

services.AddMassTransit(x =>
{        
    x.AddConsumer<RequestConsumer_MyConsumer, RequestConsumer_MyConsumer_Definition>();
    
    x.UsingRabbitMq((context, cfg) =>
    {
        rabbitMqUsername = configuration.GetValue<string>("RabbitMqUsername");
        rabbitMqPassword = configuration.GetValue<string>("RabbitMqPassword");
        cfg.Durable = false;
        cfg.AutoDelete = true;
        cfg.SetQueueArgument("x-expires", null);
        cfg.Host(rabbitMqHost, rabbitMqPort, "/", h =>
            {
                h.Username(rabbitMqUsername);
                h.Password(rabbitMqPassword);
                h.UseSsl(s =>
                {
                    s.AllowPolicyErrors(SslPolicyErrors.RemoteCertificateNameMismatch);
                });
            });            
        cfg.ReceiveEndpoint(e =>
        {
            // Add the consumers here
            e.ConfigureConsumer<RequestConsumer_MyConsumer>(context);
        });
        
    });

And the consumer definition looks like so:

    public class RequestConsumer_MyConsumer : IConsumer<IRequest_MyRequest>
{

    private ILogger<RequestConsumer_MyConsumer> _logger { get; set; }
    public RequestConsumer_MyConsumer(ILogger<RequestConsumer_MyConsumer> logger)
    {
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<IRequest_MyRequest> context)
    {
        //Consume code
    }
}

public class RequestConsumer_MyConsumer_Definition :
    ConsumerDefinition<RequestConsumer_MyConsumer>
{
    public RequestConsumer_MyConsumer_Definition()
    {
        // limit the number of messages consumed concurrently
        // this applies to the consumer only, not the endpoint
        ConcurrentMessageLimit = 8;
        Endpoint(x => x.Name = "myconsumer");
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<RequestConsumer_MyConsumer> consumerConfigurator)
    {
        // configure message retry with millisecond intervals
       endpointConfigurator.UseMessageRetry(r => r.Intervals(10000, 20000, 50000, 80000, 100000));

    }
}

I am sure this is our fault, just trying to figure out how. Thanks in advance.


Solution

  • Closing the loop on this for anyone encountering this problem, in the app's MassTransit config (the first code block), you must specify the queue name here:

    cfg.ReceiveEndpoint(<QUEUE NAME GOES HERE>, e =>
    {
        e.ConfigureConsumer<RequestConsumer_MyConsumer>(context);