I want to do a Post-Fire-Reply to an agent. Basically the agent triggers an event then replies to the caller. However I either keep getting a timeout error or the events do not fire correctly. I tried doing Post-Fire, that stopped the timeout errors but the events do not fire.
let evt = new Event<int>()
let stream = evt.Publish
type Agent<'T> = MailboxProcessor<'T>
type Fire = Fire of int
let agent = Agent.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
let (Fire i) = msg
evt.Trigger i }
loop())
let on i fn =
stream
|> Observable.filter (fun x -> x = i)
|> Observable.filter (fun x -> x <> 1)
|> Observable.subscribe (fun x -> fn x)
let rec collatz n =
printfn "%d" n
on n (fun i ->
if (i % 2 = 0) then collatz (i/2)
else collatz (3*n + 1)) |> ignore
agent.Post (Fire n) // this does not work
// evt.Trigger n // this does works
collatz 13
This is a simple experiment that repeatedly creates a function to find the next number in the Collatz series and then calls itself to return the value until it reaches 1.
What seems to happen is that the trigger only fires once. I tried experimenting with every combination of Async.RunSynchronously / Async.Start / StartChild / SynchronizationContext that I could think of but no progress. I found a blog similar to what I am doing but that didn't help me neither
EDIT Thank you Fyodor Soikin for pointing out my oversight. The original problem still remains in that I wish to both fire events and reply with a result, but get a timeout.
let evt = new Event<int>()
let stream = evt.Publish
type Agent<'T> = MailboxProcessor<'T>
type Command =
| Fire of int
| Get of int * AsyncReplyChannel<int>
let agent = Agent.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Fire i -> evt.Trigger i
| Get (i,ch) ->
evt.Trigger i
ch.Reply(i)
return! loop() }
loop())
let on i fn =
stream
|> Observable.filter (fun x -> x = i)
|> Observable.filter (fun x -> x <> 1)
|> Observable.subscribe (fun x -> fn x)
let rec collatz n =
printfn "%d" n
on n (fun i ->
if (i % 2 = 0) then collatz (i/2)
else collatz (3*n + 1)) |> ignore
agent.PostAndReply (fun ch -> (Get (n, ch))) |> ignore // timeout
agent.PostAndAsyncReply (fun ch -> (Get (n, ch))) |> Async.Ignore |> Async.Start // works but I need the result
agent.PostAndAsyncReply (fun ch -> (Get (n, ch))) |> Async.RunSynchronously |> ignore // timeout
collatz 13
Your loop
function doesn't loop. It receives the first message, triggers the event, and then just... exits. Never attempts to receive a second message.
You need to make that function work continuously: process the first message, then go right back to receive the next one, then go receive the next one, and so on. Like this:
let agent = Agent.Start(fun inbox ->
let rec loop() = async {
let! msg = inbox.Receive()
let (Fire i) = msg
evt.Trigger i
return! loop() }
loop())
Since you've reached your limit on questions, I will answer your edit here.
The reason you're getting timeouts in your second snippet is that you have a deadlock in your code. Let's trace the execution to see that.
collatz
call.collatz
call posts a message to the agent.collatz
call happens.collatz
call posts a message to the agent.collatz
call starts waiting for the agent to respond.And this is where the execution ends. The agent cannot respond at this point (in fact, it cannot even receive the next message!), because its instruction pointer is still inside evt.Trigger
. The evt.Trigger
call hasn't yet returned, so the loop
function hasn't yet recursed, so the inbox.Receive
function hasn't yet been called, so the second message is still waiting in the agent's queue.
So you get yourself a classic deadlock: collatz
is waiting for the agent to receive its message, but the agent is waiting for collatz
to finish handling the event.
The simplest, dumbest solution to this would be to just trigger the event asynchronously:
async { evt.Trigger i } |> Async.Start
This will make sure that the event handler is executed not "right there", but asynchronously, possibly on a different thread. This will in turn allow the agent not to wait for the event to be processed before it can continue its own execution loop.
In general though, when dealing with multithreading and asynchrony, one should never call unknown code directly. The agent should never directly call evt.Trigger
, or anything else that it doesn't control, because that code might be waiting on the agent itself (which is what happened in your case), thus introducing the deadlock.