I am having a very weird issue, which is seriously messing with my head.
The underlying framework is .NET Framework 4.8 and the project is a mixture of WPF and Windows Forms, with only the latter being relevant in this question.
I am currently converting some synchronous "heavy processing" code to use the messaging pattern. I am using RabbitMQ for the messaging queues. In one case, I have a ConcurrentDictionary within a form. The queue message received by a listener is "processed", an object is created based on the "processing result", and added to the ConcurrentDictionary via the thread-safe TryAdd method. Consider the below simplified code within frmDocSplitting.vb:
Private Property _DocumentsList As New ConcurrentDictionary(Of Double, Doc)
Private Property _API As API
Public Sub New(ByVal inAPI As API)
InitializeComponent()
_API = API
RegisterQueueHandler()
End Sub
Private Async Sub RegisterQueueHandler()
Await _API._BusQueueControl.ReceiveAsync(Of ProcessorQueue)("ProcessorQueue" &
_API._EnvironmentID.ToString, AddressOf ProcessQueueMessage)
End Sub
Private Async Sub ProcessQueueMessage(q As ProcessorQueue)
Dim processorService As New ProcessorService(_API)
Dim newDoc As Doc = Await processorService.MergeAndQueueForProcessing(q)
Dim success As Boolean = _DocumentsList.TryAdd(newDoc.Number, newDoc)
If success Then
Debug.WriteLine("Added Document Number " & newDoc.Number.ToString)
Else
Debug.WriteLine("Failed to add " & newDoc.Number.ToString())
End If
End Sub
The _API is an instance of the API class which contains the app state. The name of the RabbitMQ queue looks like this "ProcessorQueue44". What "MergeAndQueueForProcessing" does is not important. I believe the rest is quite clear and simple.
The ReceiveAsync method (coming from a different library) looks as the below:
public async Task ReceiveAsync<T>(string queue, Action<T> onMessage)
{
_channel.QueueDeclare(queue, true, false, false);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (s, e) =>
{
try
{
byte[] array = new byte[e.Body.Span.Length];
e.Body.Span.CopyTo(array);
// Convert byte[] to string
var jsonSpecified = Encoding.UTF8.GetString(array);
var item = JsonConvert.DeserializeObject<T>(jsonSpecified);
await Task.Run(() => onMessage(item));
//await Task.Yield();
}
catch (Exception ex)
{
throw ex;
}
};
_channel.BasicConsume(queue, true, consumer);
await Task.CompletedTask;
//await Task.Yield();
}
Using await Task.Yield() and await Task.CompletedTask creates the same behaviour.
This is the behaviour I am observing:
After 8, accessing the _DocumentsList from elsewhere, the count is 1, i.e. the TryAdd from the context of the second worker thread overwrote the first.
Note that the steps above may indicate sequential access but in actual fact, the messages may be received and handled concurrently. This is after all the reason why I am using a ConcurrentDictionary instead of a normal dictionary.
Changing the _DocumentsList to "Shared" (static) "solves" the problem, but this approach is not usable, as I need each instance of the form to have its own unique version of _DocumentsList. I tried other approaches of having a shared ConcurrentDictionary with a unique "session" ID for each form (and the current ConcurrentDictionary as its value), but apart from adding unnecessary complexity to what I am trying to do (which I believe should be rather simple), it also does not work (observing the same behaviour as explained in steps 1-8).
What am I missing / doing wrong?
Update
Tried to use a SyncLock on the TryAdd part, but I still got the same behaviour. I have also tried to create the RabbitQueueBus with the DispatchConsumersAsync param as false and changed the consumers to non-async: also to no avail.
I managed to fix the issue. The API class was holding a reference to the _BusQueueControl (of type RabbitQueueBus), and the same RabbitQueueBus was being used in the 2 instances of the form, without cleaning anything in between the runs, and causing problems. I created the _BusQueueControl as an instance of the form (thus making sure each time a new form, a new RabbitQueueBus instance is created) and added handling to delete the consumers and the channel upon closing it (for cleaning purposes).