I am setting up a project utilizing NServiceBus and SQL Server, with a publish/subscribe model and running into an issue that makes no sense. I have two messages - OpportunityMessage
and PageVisitMessage
, both of which are standard POCO classes that are flagged with IEvent
and look like this:
public class OpportunityMessage : IEvent
{
/// <summary>
/// Gets or sets the name of the form
/// </summary>
public string FormName { get; set; }
/// <summary>
/// Gets or sets the page identifier
/// </summary>
public byte[] PageIdentifier { get; set; }
/// <summary>
/// Gets or sets any supplemental data with a form
/// </summary>
public string SupplementalData { get; set; }
}
public class PageVisitMessage : IEvent
{
/// <summary>
/// Gets or sets the IP address of the requestor
/// </summary>
public string IpAddress { get; set; }
/// <summary>
/// Gets or sets the URL that generated a page visit
/// </summary>
public string NavigateUrl { get; set; }
/// <summary>
/// Gets or sets the referrer
/// </summary>
public string Referrer { get; set; }
/// <summary>
/// Gets or sets the user agent
/// </summary>
public string UserAgent { get; set; }
}
From a publishing perspective, everything works as expected - messages get published based on what's stored in the Subscriptions table I've configured; however, that's where I get confused. Each message type is destined for a different queue, messaging_opportunity_in
and messaging_visitors_in
, but whenever either message is fired, it shows up in both queues.
I did some digging and looked at the configured Subscriptions table and each message is configured to go to both queues, so it showing up in both makes sense; however, it's not setup that way from a code perspective. Each message type has a dedicated "handler" - one for PageVisitMessage
and one for OpportunityMessage
. The class structure for those looks like this:
public class OpportunityMessageHandler : BaseMessageHandler, IHandleMessages<OpportunityMessage>
{
//configured in appsettings.json as "messaging_opportunity_in"
public override string QueueName => this.Configuration["Messaging:OpportunityPublishQueue"];
public Task Handle(OpportunityMessage message, IMessageHandlerContext context)
{
//code to process the message
}
}
public class VisitorMessageHandler : BaseMessageHandler, IHandleMessages<PageVisitMessage>
{
//configured in appsettings.json as "messaging_visitors_in"
public override string QueueName => this.Configuration["Messaging:VisitorPublishQueue"];
public Task Handle(OpportunityMessage message, IMessageHandlerContext context)
{
//code to process the message
}
}
Each of those inherits from BaseMessageHandler
which is where the endpoint is configured to listen to the configured queue name via an Initialize
method that looks like this:
/// <summary>
/// Initializes this message handler.
/// </summary>
/// <param name="configuration">The <see cref="IConfiguration"/> instance containing the application's configuration.</param>
public async Task Initialize(IConfiguration configuration)
{
this.Configuration = configuration;
//setup our base listener
var listener = new EndpointConfiguration(this.QueueName);
listener.EnableInstallers();
listener.SendFailedMessagesTo($"{this.QueueName}_errors");
//configure the SQL Server transport
var transport = new SqlServerTransport(Configuration.GetConnectionString("CS"))
{
DefaultSchema = this.DefaultSchemaName
};
transport.SchemaAndCatalog.UseSchemaForQueue($"{this.QueueName}_errors", this.DefaultSchemaName);
listener.UseTransport(transport);
//configure the subscription listener
transport.Subscriptions.DisableCaching = true;
transport.Subscriptions.SubscriptionTableName = new NServiceBus.Transport.SqlServer.SubscriptionTableName(this.Configuration["Messaging:Subscriptions"], schema: this.DefaultSchemaName);
//spin it up
this.EndpointInstance = await Endpoint.Start(listener).ConfigureAwait(false);
}
What I'm trying to track down is why do these two handlers get setup to listen to messages they aren't configured for? When I run the backend message processor that spins these up, the configured Subscriptions table adds both endpoints (messaging_opportunities_in
and messaging_visitors_in
) as listening to all messages that exist, regardless of the fact that their handlers are configured to listen to a specific queue and not all. This essentially means every message is duplicated between the two queues and since the handlers are running, they end up processing everything twice even though each should be responsible solely for its' own message type.
Any help would be greatly appreciated!
Nowhere on the NServiceBus documentation website is something similar to your BaseMessageHandler
class on message handlers used. But this is where the problem starts.
You're not providing all code, so you refer to a BaseMessageHandler
but only show the Initialize()
method. I'm assuming that this method is inside the BaseMessageHandler
class. But the code you shared doesn't show how and when that method is called.
Let's dive into this line:
var listener = new EndpointConfiguration(this.QueueName);
First of all, the object listener
should be called configuration
or endpointConfiguration
, because it'll never listen to anything. Also, the parameter in the constructor is endpointName
and not expecting a queue name. For example, in SQL Server your actual queue name would be something like opportunity@dbo@nservicebus
.
Since you mention messaging_opportunity_in
and messaging_visitors_in
, without having any additional context, I would name the endpoints opportunities
and visitors
.
Now let's look at the following line:
this.EndpointInstance = await Endpoint.Start(listener).ConfigureAwait(false);
The Endpoint.Start()
method returns the actual endpoint instance. The thing that is trying to process messages. But before it's started, it will do some scanning of all the types that inherit the IHandleMessages<T>
interface. If Type<T>
is an event (in your case, classes marked with IEvent
) and it will create a subscription for that event.
Now, I don't know how the Initialize()
method on those base classes is executed, because you did not include that code. But you execute it twice. Once for opportunity
and once for visitors
. This means the opportunity
endpoint starts scanning for classes that implement IMessageHandler<T>
and finds two.
OpportunityMessageHandler
VisitorMessageHandler
It sees two messages, both are events.
OpportunityMessage
PageVisitMessage
The endpoint opportunity
endpoint stores the fact that it wants to subscribe to both messages. Resulting in the subscription you're seeing.
The next thing that happens is that the visitors
endpoint is started and does EXACTLY THE SAME THING. It starts scanning, finds 2 message handlers and 2 events and creates another subscription.
This assumes you want 2 components where you send messages between
visitors
to opportunities
and vice versa.I recommend following the tutorial on the NServiceBus website and contact Particular Software, makers of NServiceBus. If you mention my name, we can set up a call and run through this together and you can even request free help with a proof of concept if you'd like that.