pythonpython-asynciofutureevent-loop

How can I "wake up" an event loop to notify it that a Future was completed from another thread?


When using python async/asyncio, I often create and complete asyncio.Future objects from threads that are not the thread running the event loop.

Unless I complete those futures in the thread that is running the event loop or via a function that notifies that loop of the completion, the event loop often does not "notice" that the futures are completed.

Is there a way to "notify" an event loop that it should check a Future for completion if that future was readied (via set_result) externally?

Why I am asking this

The threads which ready futures need to a) have very low latency, and b) check whether the future has been readied, synchronously, later on (e.g. via future.done()).

The event loop awaiting the Futures does not need to have low latency in being notified that they're ready--it can be notified a few milliseconds late.

Ideally there would be a performant way to notify the event loop that a Future had been readied after readying it synchronously in a thread.

Even if that's not possible, the event loop could poll readiness on an interval, so long as the futures were synchronously readied as quickly as possible in threads.

What I have tried

The "correct" way to solve this problem is with call_soon_threadsafe, e.g.:

def do_in_thread(future):
    future.get_loop().call_soon_threasafe(future.set_result, "the result")

That notifies the event loop of Future readiness reliably, but does not work for two reasons:

  1. It has significant (8-10x) overhead versus calling future.set_result in my benchmarks.
  2. It doesn't ready the Future until the event loop runs, which means I can't reliably check if the Future is done, which I need to do. For example, this won't work:
def do_in_thread(future):
    future.get_loop().call_soon_threasafe(future.set_result, "the result")
    assert future.done()  # Fails

One thing that does seem to work is to notify the event loop by intentionally failing a second call to set_result via call_soon_threadsafe, and swallowing the InvalidStateError, like this:

def ensure_result(f, res):
    try:
        f.set_result(res)
    except InvalidStateError:
        pass


def in_thread(fut: Future):
    fut.set_result("the result")
    fut.get_loop().call_soon_threadsafe(ensure_result, fut, "the result")

That still has overhead, but I could remove the overhead of calling call_soon_threadsafe by tracking Futures in a thread-shared data structure and polling calls to ensure_result occasionally. However, I'm still not sure:

  1. Does that reliably work? Is set_result failing with InvalidStateError guaranteed to notify the event loop that a await given Future can return out of await, or is that an undocumented implementation detail I'm relying on?
  2. Is there a better way to achieve that periodic-wakeup that doesn't involve me keeping track of/polling such Futures myself?

In a perfect world, there would be a loop.poll_all_pending_futures() or loop.update_future_state(fut) method which would achieve this efficiently, but I don't know of one.


Solution

  • Disclaimer: this may not be future proof or handle all event loop contexts.

    I ran into the same essential problem, and banged my head on the wall for a few days. I was using callbacks from a C++ extension to set the future result via code running in another system thread (created on the C side). I could set that future result, but the event loop didn't care unless I had other coroutines keeping it "alive" in that Python thread, and even then it was often slow.

    I used this conceptual solution (posting against your example code) to solve it:

    def in_thread(fut: Future):
        fut.set_result("the result")
        asyncio.run_coroutine_threadsafe( asyncio.sleep(0), fut.get_loop() )
    

    The result was a delay of like 100 microseconds (not milli, micro) between setting the future and getting the event loop to "wake up" and process it.