Recently I read the code of asyncio and felt confused about loop.call_soon and loop.call_soon_threadsafe. The only difference I found is that there is a _write_to_self in call_soon_threadsafe. As we know, a event loop runs in a specific thread. All tasks scheduled by the event loop orderly. And we can assume tasks are thread safe while they were scheduled one by one. So how can thread unsafe situation happen in call_soon?
call_soon |
call_soon_threadsafe |
---|---|
self._check_closed() |
self._check_closed() |
if self._debug: |
if self._debug: |
self._check_thread() |
|
self._check_callback(callback, 'call_soon') |
self._check_callback(callback, 'call_soon_threadsafe') |
handle = self._call_soon(callback, args, context) |
handle = self._call_soon(callback, args, context) |
if handle._source_traceback: |
if handle._source_traceback: |
del handle._source_traceback[-1] |
del handle._source_traceback[-1] |
self._write_to_self() |
|
return handle |
return handle |
Briefly: The call_soon_threadsafe method will allow a non-event loop thread to run a callable in the event loop thread. This does the following:
The example below is relatively trivial but keep in mind what the docs mention... from Concurrency and Multithreading...
"... Almost all asyncio objects are not thread safe, which is typically not a problem unless there is code that works with them from outside of a Task or a callback. ..."
This means, if you have a non-event loop thread that wishes to touch objects that should not be touched outside of the event loop thread, you should run the callable in the context of the event loop thread by using call_soon_threadsafe
.
Yes, both call_soon_threadsafe and call_soon queue the work, but the call_soon_threadsafe variation will wake the event loop thread to process the callable in its thread. Without waking, the work would not be performed if the event loop thread is waiting (per the example if SHOW_MAIN_HANG=True).
Example demonstrating safe/unsafe difference...
Set SHOW_MAIN_HANG=True to see one example issue. See how SHOW_MAIN_HANG=False uses the event loop thread callback to properly set the event and wake the event loop thread to observe the signal.
import asyncio
import threading
import time
# Set SHOW_MAIN_HANG to True to see main loop hang due to unsafe event set.
# Set SHOW_MAIN_HANG to False to see main loop properly exit due to save event set.
# The term "safe" here means the main loop is awaked to process any work
# adjusted by non-asyncio threads such as in the example below.
# Most importantly, "safe" here means the callable is run in the context
# of the event loop (main loop below) thread, which means it can access any
# event loop items that woudl be unsafe to access from a different thread.
SHOW_MAIN_HANG = False
def print_thread_info_msg(msg):
print(
f"{msg}: "
f"name={threading.current_thread().name} "
f"native_id={threading.current_thread().native_id}"
)
def set_the_event(event: asyncio.Event):
print_thread_info_msg("Setting the event from callable")
event.set()
def other_thread_func(
main_loop: asyncio.AbstractEventLoop,
main_loop_event: asyncio.Event
):
# For example, this thread is doing other work.
# This thread's work is being performed while the
# main loop is both running tasks but also when the
# main loop might be doing nothing, waiting. This
# thread cannot predict if the main loop is running
# or waiting, nor should it have to conern itself
# with such details. Nevertheless, this thread wishes
# to signal the main loop when something happens, perhaps
# when there's work ready, or maybe to indicate the
# work is completed, the program should end. There
# many examples to choose from... this is just one
# abstract hypothetical.
print_thread_info_msg("other_thread_func: doing some other work")
time.sleep(3)
if SHOW_MAIN_HANG:
# This way sets the event but does not wake the main loop
# which means the event can be set but the main loop sits
# waiting, not checking the vent. This will cause main loop
# to hang, not knowing the event has been signaled.
print(f"other_thread_func: set (unsafe way)")
main_loop_event.set()
else:
# This way sets the event and wakes the main loop, which
# avoids the hang.
print(f"other_thread_func: set (safe way)")
main_loop.call_soon_threadsafe(set_the_event, main_loop_event)
async def main():
print_thread_info_msg("Main event loop thread")
main_loop = asyncio.get_running_loop()
main_loop_event = asyncio.Event()
thd = threading.Thread(target=other_thread_func, args=(main_loop, main_loop_event,))
thd.start()
print(f"main: wait")
the_task = asyncio.create_task(main_loop_event.wait())
await asyncio.wait([the_task])
print(f"main: wait completed")
asyncio.run(main())
print("program exit")
Example with SHOW_MAIN_HANG=False...
Main event loop thread: name=MainThread native_id=2924
main: wait
other_thread_func: doing some other work: name=Thread-7 (other_thread_func) native_id=8156
other_thread_func: set (safe way)
Setting the event from callable: name=MainThread native_id=2924
main: wait completed
program exit
Example with SHOW_MAIN_HANG=True...
Main event loop thread: name=MainThread native_id=19240
main: wait
other_thread_func: doing some other work: name=Thread-7 (other_thread_func) native_id=21088
other_thread_func: set (unsafe way)
(hangs here, does not exit)
As you observed, a primary difference is the safe variation calls _write_to_self. The _write_to_self method will wake the event loop thread. If that did not happen, the work could be queued but not processed by the event loop thread as can be seen in the example above when SHOW_MAIN_HANG=True.
From CPython ...\Lib\asyncio\base_events.py
...
def call_soon_threadsafe(self, callback, *args, context=None):
"""Like call_soon(), but thread-safe."""
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
self._write_to_self()
return handle
...