.netf#task-parallel-librarycomputation-expression

How to run the continuation of a task in the taskBuilder CE in the same thread of the code before the let! operation?


I am coding a long running operation in FSharp's taks CE as follows

let longRunningTask = Task.Run(...)


// Now let's do the rest of the multi-tasking program

task {
  DO SOMETHING
  let! result = longRunningTask
  DO SOMETHING ELSE
}

The problem is DO SOMETHING ELSE appears to be running on an arbitrary thread (as observed also by printing the current thread id), whereas I absolutely need it to run on the same thread as DO SOMETHING, as I don't want any other form of concurrency except for the longRunningTask.

I've tried in many ways to set the current synchronization context, creating first a unique value of that type, but that doesn't seem to affect the result.


Solution

  • It might be an overkill, but SynchronizationContext may help you. It's used to dispatch delegates to some threads. There's a lot of explanations on how it's working (search for ConfigureAwait(false)), so I'll focus on implementation

    type ThreadOwningSyncCtx() =
        inherit SynchronizationContext()
    
        let _queue = new BlockingCollection<(SendOrPostCallback * obj)>()
    
        member _.DoWork(cancellationToken: CancellationToken) =
            while not cancellationToken.IsCancellationRequested do
                let (callback, state) = _queue.Take()
                callback.Invoke state
            ()
    
        override _.Post(callback, state) =
            _queue.Add((callback, state))
            ()
    
        override _.Send(callback, state) =
            let tcs = TaskCompletionSource()
            let cb s =
                callback.Invoke s
                tcs.SetResult()
            _queue.Add((cb, state))
            tcs.Task.Wait()
            ()
    

    Notes on methods:

    Usage:

    let syncCtx = ThreadOwningSyncCtx()
    // set current sync ctx, so every continuation is queued back to main thread.
    // comment this line and `printThreadId` will return different numbers
    SynchronizationContext.SetSynchronizationContext syncCtx
    
    let printThreadId() =
        printfn "%d" Thread.CurrentThread.ManagedThreadId
    
    // create cancellation token, so app won't run indefinitely
    let cts = new CancellationTokenSource()
    
    // task to simulate some meaningful work
    task {
        printThreadId()
        do! Task.Yield() // this action always completes asynchronously
        printThreadId()
    
        cts.Cancel() // cancel token, so main thread can continue it's work
    } |> ignore
    
    // process all pending continuations
    syncCtx.DoWork(cts.Token)