I'm having a following issue - I need to have my operations queued to avoid them running in parallel. Therefore, I added RabbitMQ queue which is fed by API method taking requests from multiple clients. The issue is, that despite I set Prefetchcount to 1 both on connection string and subscription, it still processes those tasks concurrently.
Here is my connection and subscription along with callback test procedure
ConnectionConfiguration conConfig = new ConnectionConfiguration()
{
UserName = own_s.RABBIT_USER,
Password = own_s.RABBIT_PASSWORD,
VirtualHost = own_s.RABBIT_VHOST,
PrefetchCount = 1
};
await _bus.PubSub.SubscribeAsync<applypkg>(
"topic",
(req) => ApplyPackageService(req),
(cfg) => cfg.WithPrefetchCount(1));
async void ApplyPackageService(applypkg msg)
{
var g = Guid.NewGuid();
Log.Information($"Callback started: {g}");
await Task.Delay(20000);
Log.Information($"Callback ended: {g}");
}
Unfortunately, it does not work as expected:
===> 08:37:41 [Information] Callback started: 6f27a664-977b-47ca-9eac-81a124066403
===> 08:37:46 [Information] Callback started: cbb15964-f69d-47c6-9225-3c4fc39b326e
===> 08:37:58 [Information] Callback started: 7c086690-7b7f-449c-b74e-615dda001a27
===> 08:38:01 [Information] Callback ended: 6f27a664-977b-47ca-9eac-81a124066403
===> 08:38:05 [Information] Callback started: 4466ce86-6d16-4707-a54e-b166555a92d8
===> 08:38:06 [Information] Callback ended: cbb15964-f69d-47c6-9225-3c4fc39b326e
===> 08:38:18 [Information] Callback ended: 7c086690-7b7f-449c-b74e-615dda001a27
===> 08:38:25 [Information] Callback ended: 4466ce86-6d16-4707-a54e-b166555a92d8
Rabbit Management app shows proper value of prefetchcount in the queue, so it has to be something in the client.
Anybody has encountered similar behaviour? I've read somewhere on the forum, that it is dependent on the EasyNetQ, but I could not determine which one is OK.
SubscribeAsync
takes an async method as message handler. You want to handle the messages in the asynchronous ApplyPackageService
function, but by using the anonymous method, this is your actual message handler. And since you call the asynchronous ApplyPackageService
method in this method without waiting for the result (await), the message handling for EasyNetQ is immediately completed and the next message is handled.
To handle the messages one after the other, you can add a CancellationToken
parameter to your ApplyPackageService
function, change the return type to Task and use the method directly as a MessageHandler:
await _bus.PubSub.SubscribeAsync<applypkg>(
"topic",
ApplyPackageService,
(cfg) => cfg.WithPrefetchCount(1));
async Task ApplyPackageService(applypkg msg, CancellationToken ct)
{
var g = Guid.NewGuid();
Log.Information($"Callback started: {g}");
await Task.Delay(20000);
Log.Information($"Callback ended: {g}");
}