asp.net-coreeasynetq

How to write an EasyNetQ auto subscriber dispatcher for the ASP.NET Core services provider?


So, the EasyNetQ auto subscriber has a basic default dispatcher that cannot create message consumer classes with non-parameterless constructors.

To see that in action, create a consumer with a required dependency. You can setup your own service or you can use ILogger<T>, which is registered automatically by the framework defaults.

ConsumeTextMessage.cs

public class ConsumeTextMessage : IConsume<TextMessage>
{
    private readonly ILogger<ConsumeTextMessage> logger;

    public ConsumeTextMessage(ILogger<ConsumeTextMessage> logger)
    {
        this.logger = logger;
    }

    public void Consume(TextMessage message)
    {
        ...
    }
}

Wire up the auto subscriber (there's some leeway here as far as where/when to write/run this code).

Startup.cs

public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
}

(Somewhere else, maybe startup.Configure or a BackgroundService)

var subscriber = new AutoSubscriber(bus, "example");
subscriber.Subscribe(Assembly.GetExecutingAssembly());

Now, start the program and publish some messages, you should see every message end up in the default error queue.

System.MissingMethodException: No parameterless constructor defined for this object.
   at System.RuntimeTypeHandle.CreateInstance(RuntimeType type, Boolean publicOnly, Boolean wrapExceptions, Boolean& canBeCached, RuntimeMethodHandleInternal& ctor)
   at System.RuntimeType.CreateInstanceSlow(Boolean publicOnly, Boolean wrapExceptions, Boolean skipCheckThis, Boolean fillCache)
   at System.Activator.CreateInstance[T]()
   at EasyNetQ.AutoSubscribe.DefaultAutoSubscriberMessageDispatcher.DispatchAsync[TMessage,TAsyncConsumer](TMessage message)
   at EasyNetQ.Consumer.HandlerRunner.InvokeUserMessageHandlerInternalAsync(ConsumerExecutionContext context)

I know that I can provide my own dispatcher, but how do we get that working with the ASP.NET Core services provider; ensuring that this works with scoped services?


Solution

  • So, here's what I came up with.

    public class MessageDispatcher : IAutoSubscriberMessageDispatcher
    {
        private readonly IServiceProvider provider;
    
        public MessageDispatcher(IServiceProvider provider)
        {
            this.provider = provider;
        }
    
        public void Dispatch<TMessage, TConsumer>(TMessage message)
            where TMessage : class
            where TConsumer : class, IConsume<TMessage>
        {
            using(var scope = provider.CreateScope())
            {
                var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
                consumer.Consume(message);
            }
        }
    
        public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
            where TMessage : class
            where TConsumer : class, IConsumeAsync<TMessage>
        {
            using(var scope = provider.CreateScope())
            {
                var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
                await consumer.ConsumeAsync(message);
            }
        }
    }
    

    A couple of notable points...

    The IServiceProvider dependency is the ASP.NET Core DI container. This may be unclear at first, because, throughout Startup.ConfigureServices(), you're registering types using the other interface, IServiceCollection.

    public MessageDispatcher(IServiceProvider provider)
    {
        this.provider = provider;
    }
    

    In order to resolve scoped services, you need to create and manage a scope's lifecycle around creating and using the consumer. And I'm using the GetRequiredService<T> extension method because I really want a nasty exception here and not a null reference that might leak for a while before we notice it (in the form of a null reference exception).

    using(var scope = provider.CreateScope())
    {
        var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
        consumer.Consume(message);
    }
    

    If you only used the provider directly, as in provider.GetRequiredService<T>(), you'd see an error like this when attempting to resolve a scoped consumer or a scoped dependency for a consumer.

    Exception thrown: 'System.InvalidOperationException' in Microsoft.Extensions.DependencyInjection.dll: 'Cannot resolve scoped service 'Example.Messages.ConsumeTextMessage' from root provider.'

    In order to resolve scoped services and maintain their lifecycle properly for async consumers, you need to get the async/await keywords in the right place. You should await the ConsumeAsync call, which requires that the method be async. Use breakpoints on the await line and in your consumer and step line-by-line to get a better handle on this!

    public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
        where TMessage : class
        where TConsumer : class, IConsumeAsync<TMessage>
    {
        using(var scope = provider.CreateScope())
        {
            var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
            await consumer.ConsumeAsync(message);
        }
    }
    

    OK, now that we have the dispatcher, we just need to set everything up properly in Startup. We need to resolve the dispatcher from the provider so that the provider can provide itself properly. This is just one way of doing this.

    Startup.cs

    public void ConfigureServices(IServiceCollection services)
    {
        // messaging
        services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
        services.AddSingleton<MessageDispatcher>();
        services.AddSingleton<AutoSubscriber>(provider =>
        {
            var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "example")
            {
                AutoSubscriberMessageDispatcher = provider.GetRequiredService<MessageDispatcher>();
            }
        });
    
        // message handlers
        services.AddScoped<ConsumeTextMessage>();
    }
    
    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        app.ApplicationServices.GetRequiredServices<AutoSubscriber>().SubscribeAsync(Assembly.GetExecutingAssembly());
    }