asynchronousf#agentmailboxprocessorlifo

A MailboxProcessor that operates with a LIFO logic


I am learning about F# agents (MailboxProcessor).

I am dealing with a rather unconventional problem.

I am exploring ways to deal with this problem.

The first idea is to implement a stack (LIFO) in dataSource. dataSource would send over the latest observation available when dataProcessor becomes available to receive and process the data. This solution may work but it may get complicated as dataProcessor may need to be blocked and re-activated; and communicate its status to dataSource, leading to a two way communication problem. This problem may boil down to a blocking queue in the consumer-producer problem but I am not sure..

The second idea is to have dataProcessor taking care of message sorting. In this architecture, dataSource will simply post updates in dataProcessor's queue. dataProcessor will use Scanto fetch the latest data available in his queue. This may be the way to go. However, I am not sure if in the current design of MailboxProcessorit is possible to clear a queue of messages, deleting the older obsolete ones. Furthermore, here, it is written that:

Unfortunately, the TryScan function in the current version of F# is broken in two ways. Firstly, the whole point is to specify a timeout but the implementation does not actually honor it. Specifically, irrelevant messages reset the timer. Secondly, as with the other Scan function, the message queue is examined under a lock that prevents any other threads from posting for the duration of the scan, which can be an arbitrarily long time. Consequently, the TryScan function itself tends to lock-up concurrent systems and can even introduce deadlocks because the caller's code is evaluated inside the lock (e.g. posting from the function argument to Scan or TryScan can deadlock the agent when the code under the lock blocks waiting to acquire the lock it is already under).

Having the latest observation bounced back may be a problem. The author of this post, @Jon Harrop, suggests that

I managed to architect around it and the resulting architecture was actually better. In essence, I eagerly Receive all messages and filter using my own local queue.

This idea is surely worth exploring but, before starting to play around with code, I would welcome some inputs on how I could structure my solution.

Thank you.


Solution

  • tl;dr I would try this: take Mailbox implementation from FSharp.Actor or Zach Bray's blog post, replace ConcurrentQueue by ConcurrentStack (plus add some bounded capacity logic) and use this changed agent as a dispatcher to pass messages from dataSource to an army of dataProcessors implemented as ordinary MBPs or Actors.

    tl;dr2 If workers are a scarce and slow resource and we need to process a message that is the latest at the moment when a worker is ready, then it all boils down to an agent with a stack instead of a queue (with some bounded capacity logic) plus a BlockingQueue of workers. Dispatcher dequeues a ready worker, then pops a message from the stack and sends this message to the worker. After the job is done the worker enqueues itself to the queue when becomes ready (e.g. before let! msg = inbox.Receive()). Dispatcher consumer thread then blocks until any worker is ready, while producer thread keeps the bounded stack updated. (bounded stack could be done with an array + offset + size inside a lock, below is too complex one)

    Details

    MailBoxProcessor is designed to have only one consumer. This is even commented in the source code of MBP here (search for the word 'DRAGONS' :) )

    If you post your data to MBP then only one thread could take it from internal queue or stack. In you particular use case I would use ConcurrentStack directly or better wrapped into BlockingCollection:

    BlockingCollection has methods like AddToAny/TakeFromAny, which work on an arrays of BlockingCollections. This could help, e.g.:

    Something like this:

                (data stream produces 'T)
                    |
                [dispatcher's BCSC]
                    |
                (a dispatcher thread consumes 'T  and pushes to processors, manages capacity of BCCS and LRU queue)
                     |                               |
                [processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP]
                     |                               |
                   (process)                         (process)
    

    Instead of ConcurrentStack, you may want to read about heap data structure. If you need your latest messages by some property of messages, e.g. timestamp, rather than by the order in which they arrive to the stack (e.g. if there could be delays in transit and arrival order <> creation order), you can get the latest message by using heap.

    If you still need Agents semantics/API, you could read several sources in addition to Dave's links, and somehow adopt implementation to multiple concurrent consumers:

    I have a somewhat similar use case and for the last two days I have researched everything I could find on the F# Agents/Actors. This answer is a kind of TODO for myself to try these ideas, of which half were born during writing it.