I am coding a long running operation in FSharp's taks CE as follows
let longRunningTask = Task.Run(...)
// Now let's do the rest of the multi-tasking program
task {
DO SOMETHING
let! result = longRunningTask
DO SOMETHING ELSE
}
The problem is DO SOMETHING ELSE appears to be running on an arbitrary thread (as observed also by printing the current thread id), whereas I absolutely need it to run on the same thread as DO SOMETHING, as I don't want any other form of concurrency except for the longRunningTask.
I've tried in many ways to set the current synchronization context, creating first a unique value of that type, but that doesn't seem to affect the result.
It might be an overkill, but SynchronizationContext may help you. It's used to dispatch delegates to some threads. There's a lot of explanations on how it's working (search for ConfigureAwait(false)
), so I'll focus on implementation
type ThreadOwningSyncCtx() =
inherit SynchronizationContext()
let _queue = new BlockingCollection<(SendOrPostCallback * obj)>()
member _.DoWork(cancellationToken: CancellationToken) =
while not cancellationToken.IsCancellationRequested do
let (callback, state) = _queue.Take()
callback.Invoke state
()
override _.Post(callback, state) =
_queue.Add((callback, state))
()
override _.Send(callback, state) =
let tcs = TaskCompletionSource()
let cb s =
callback.Invoke s
tcs.SetResult()
_queue.Add((cb, state))
tcs.Task.Wait()
()
Notes on methods:
Post
: Method which is executed on async path. This method is called from infrastructure of Task
when C# await
or F# let!
do!
completes asynchronously. Callback is queued to be completed sometime.
Send
: Method which is executed on sync path. It's expected that callback
will be executed before this method returns. For example when someone calls a CancellationTokenSource.Cancel
or WPF's Dispatcher.Invoke
or WinForms Control.Invoke
DoWork
: Method which blocks current thread to execute all pending callback, because we can't just interrupt thread to perform some task, it must be waiting for it.
Usage:
let syncCtx = ThreadOwningSyncCtx()
// set current sync ctx, so every continuation is queued back to main thread.
// comment this line and `printThreadId` will return different numbers
SynchronizationContext.SetSynchronizationContext syncCtx
let printThreadId() =
printfn "%d" Thread.CurrentThread.ManagedThreadId
// create cancellation token, so app won't run indefinitely
let cts = new CancellationTokenSource()
// task to simulate some meaningful work
task {
printThreadId()
do! Task.Yield() // this action always completes asynchronously
printThreadId()
cts.Cancel() // cancel token, so main thread can continue it's work
} |> ignore
// process all pending continuations
syncCtx.DoWork(cts.Token)