asynchronouseventsf#agentsmailboxprocessor

F# events not working inside Async workflow


I want to do a Post-Fire-Reply to an agent. Basically the agent triggers an event then replies to the caller. However I either keep getting a timeout error or the events do not fire correctly. I tried doing Post-Fire, that stopped the timeout errors but the events do not fire.

let evt = new Event<int>()
let stream = evt.Publish

type Agent<'T> = MailboxProcessor<'T>
type Fire = Fire of int

let agent = Agent.Start(fun inbox -> 
    let rec loop() = async {
        let! msg = inbox.Receive()
        let (Fire i) = msg
        evt.Trigger i }
    loop())

let on i fn = 
    stream 
    |> Observable.filter (fun x -> x = i) 
    |> Observable.filter (fun x -> x <> 1)
    |> Observable.subscribe (fun x -> fn x) 

let rec collatz n =
    printfn "%d" n
    on n (fun i -> 
        if (i % 2 = 0) then collatz (i/2)
        else collatz (3*n + 1)) |> ignore

    agent.Post (Fire n) // this does not work
    // evt.Trigger n // this does works


collatz 13    

This is a simple experiment that repeatedly creates a function to find the next number in the Collatz series and then calls itself to return the value until it reaches 1.

What seems to happen is that the trigger only fires once. I tried experimenting with every combination of Async.RunSynchronously / Async.Start / StartChild / SynchronizationContext that I could think of but no progress. I found a blog similar to what I am doing but that didn't help me neither

EDIT Thank you Fyodor Soikin for pointing out my oversight. The original problem still remains in that I wish to both fire events and reply with a result, but get a timeout.

let evt = new Event<int>()
let stream = evt.Publish

type Agent<'T> = MailboxProcessor<'T>
type Command = 
    | Fire of int
    | Get of int * AsyncReplyChannel<int>

let agent = Agent.Start(fun inbox -> 
    let rec loop() = async {
        let! msg = inbox.Receive()
        match msg with
        | Fire i -> evt.Trigger i 
        | Get (i,ch) -> 
            evt.Trigger i 
            ch.Reply(i)
        return! loop() }
    loop())

let on i fn = 
    stream 
    |> Observable.filter (fun x -> x = i) 
    |> Observable.filter (fun x -> x <> 1)
    |> Observable.subscribe (fun x -> fn x) 

let rec collatz n =
    printfn "%d" n
    on n (fun i -> 
        if (i % 2 = 0) then collatz (i/2)
        else collatz (3*n + 1)) |> ignore

    agent.PostAndReply (fun ch -> (Get (n, ch))) |> ignore // timeout
    agent.PostAndAsyncReply (fun ch -> (Get (n, ch))) |> Async.Ignore |> Async.Start // works but I need the result
    agent.PostAndAsyncReply (fun ch -> (Get (n, ch))) |> Async.RunSynchronously |> ignore // timeout

collatz 13

Solution

  • Your loop function doesn't loop. It receives the first message, triggers the event, and then just... exits. Never attempts to receive a second message.

    You need to make that function work continuously: process the first message, then go right back to receive the next one, then go receive the next one, and so on. Like this:

    let agent = Agent.Start(fun inbox -> 
        let rec loop() = async {
            let! msg = inbox.Receive()
            let (Fire i) = msg
            evt.Trigger i
            return! loop() }
        loop())
    

    Edit

    Since you've reached your limit on questions, I will answer your edit here.

    The reason you're getting timeouts in your second snippet is that you have a deadlock in your code. Let's trace the execution to see that.

    1. THREAD 1: The agent is started.
    2. THREAD 2: The first collatz call.
    3. THREAD 2: The first collatz call posts a message to the agent.
    4. THREAD 1: The agent receives the message.
    5. THREAD 1: The agent triggers the event.
    6. THREAD 1: As a result of the event, the second collatz call happens.
    7. THREAD 1: The second collatz call posts a message to the agent.
    8. THREAD 1: The second collatz call starts waiting for the agent to respond.

    And this is where the execution ends. The agent cannot respond at this point (in fact, it cannot even receive the next message!), because its instruction pointer is still inside evt.Trigger. The evt.Trigger call hasn't yet returned, so the loop function hasn't yet recursed, so the inbox.Receive function hasn't yet been called, so the second message is still waiting in the agent's queue.

    So you get yourself a classic deadlock: collatz is waiting for the agent to receive its message, but the agent is waiting for collatz to finish handling the event.

    The simplest, dumbest solution to this would be to just trigger the event asynchronously:

        async { evt.Trigger i } |> Async.Start
    

    This will make sure that the event handler is executed not "right there", but asynchronously, possibly on a different thread. This will in turn allow the agent not to wait for the event to be processed before it can continue its own execution loop.

    In general though, when dealing with multithreading and asynchrony, one should never call unknown code directly. The agent should never directly call evt.Trigger, or anything else that it doesn't control, because that code might be waiting on the agent itself (which is what happened in your case), thus introducing the deadlock.