When using python async/asyncio, I often create and complete asyncio.Future
objects from threads that are not the thread running the event loop.
Unless I complete those futures in the thread that is running the event loop or via a function that notifies that loop of the completion, the event loop often does not "notice" that the futures are completed.
Is there a way to "notify" an event loop that it should check a Future for completion if that future was readied (via set_result) externally?
The threads which ready futures need to a) have very low latency, and b) check whether the future has been readied, synchronously, later on (e.g. via future.done()
).
The event loop await
ing the Futures does not need to have low latency in being notified that they're ready--it can be notified a few milliseconds late.
Ideally there would be a performant way to notify the event loop that a Future had been readied after readying it synchronously in a thread.
Even if that's not possible, the event loop could poll readiness on an interval, so long as the futures were synchronously readied as quickly as possible in threads.
The "correct" way to solve this problem is with call_soon_threadsafe
, e.g.:
def do_in_thread(future):
future.get_loop().call_soon_threasafe(future.set_result, "the result")
That notifies the event loop of Future readiness reliably, but does not work for two reasons:
future.set_result
in my benchmarks.def do_in_thread(future):
future.get_loop().call_soon_threasafe(future.set_result, "the result")
assert future.done() # Fails
One thing that does seem to work is to notify the event loop by intentionally failing a second call to set_result
via call_soon_threadsafe
, and swallowing the InvalidStateError
, like this:
def ensure_result(f, res):
try:
f.set_result(res)
except InvalidStateError:
pass
def in_thread(fut: Future):
fut.set_result("the result")
fut.get_loop().call_soon_threadsafe(ensure_result, fut, "the result")
That still has overhead, but I could remove the overhead of calling call_soon_threadsafe
by tracking Futures in a thread-shared data structure and polling calls to ensure_result
occasionally. However, I'm still not sure:
set_result
failing with InvalidStateError
guaranteed to notify the event loop that a await
given Future can return out of await
, or is that an undocumented implementation detail I'm relying on?In a perfect world, there would be a loop.poll_all_pending_futures()
or loop.update_future_state(fut)
method which would achieve this efficiently, but I don't know of one.
Disclaimer: this may not be future proof or handle all event loop contexts.
I ran into the same essential problem, and banged my head on the wall for a few days. I was using callbacks from a C++ extension to set the future result via code running in another system thread (created on the C side). I could set that future result, but the event loop didn't care unless I had other coroutines keeping it "alive" in that Python thread, and even then it was often slow.
I used this conceptual solution (posting against your example code) to solve it:
def in_thread(fut: Future):
fut.set_result("the result")
asyncio.run_coroutine_threadsafe( asyncio.sleep(0), fut.get_loop() )
The result was a delay of like 100 microseconds (not milli, micro) between setting the future and getting the event loop to "wake up" and process it.