pythonthread-safetypython-asyncio

Difference between asyncio call_soon and call_soon_threadsafe? Why is call_soon thread-unsafe?


Recently I read the code of asyncio and felt confused about loop.call_soon and loop.call_soon_threadsafe. The only difference I found is that there is a _write_to_self in call_soon_threadsafe. As we know, a event loop runs in a specific thread. All tasks scheduled by the event loop orderly. And we can assume tasks are thread safe while they were scheduled one by one. So how can thread unsafe situation happen in call_soon?

call_soon call_soon_threadsafe
self._check_closed() self._check_closed()
if self._debug: if self._debug:
self._check_thread()
self._check_callback(callback, 'call_soon') self._check_callback(callback, 'call_soon_threadsafe')
handle = self._call_soon(callback, args, context) handle = self._call_soon(callback, args, context)
if handle._source_traceback: if handle._source_traceback:
del handle._source_traceback[-1] del handle._source_traceback[-1]
self._write_to_self()
return handle return handle

Solution

  • Briefly: The call_soon_threadsafe method will allow a non-event loop thread to run a callable in the event loop thread. This does the following:

    1. If the event loop thread is waiting, call_soon_threadsafe wakes the event loop thread to process the callable. Without this, per the example below, the event loop thread will wait indefinitely and not process the callable.
    2. After waking the event loop, the event loop calls the callable in the context of the event loop thread. The callable is therefore run in the context of the event loop thread and can safely access any objects of the event loop thread as though they were accessed from the event loop code. In the failing case example below, we can see an "unsafe" event set is not observed by the event loop because the loop is not awakened.

    The example below is relatively trivial but keep in mind what the docs mention... from Concurrency and Multithreading...

    "... Almost all asyncio objects are not thread safe, which is typically not a problem unless there is code that works with them from outside of a Task or a callback. ..."

    This means, if you have a non-event loop thread that wishes to touch objects that should not be touched outside of the event loop thread, you should run the callable in the context of the event loop thread by using call_soon_threadsafe.

    Yes, both call_soon_threadsafe and call_soon queue the work, but the call_soon_threadsafe variation will wake the event loop thread to process the callable in its thread. Without waking, the work would not be performed if the event loop thread is waiting (per the example if SHOW_MAIN_HANG=True).

    Example demonstrating safe/unsafe difference...

    Set SHOW_MAIN_HANG=True to see one example issue. See how SHOW_MAIN_HANG=False uses the event loop thread callback to properly set the event and wake the event loop thread to observe the signal.

    import asyncio
    import threading
    import time
    
    # Set SHOW_MAIN_HANG to True to see main loop hang due to unsafe event set.
    # Set SHOW_MAIN_HANG to False to see main loop properly exit due to save event set.
    # The term "safe" here means the main loop is awaked to process any work 
    # adjusted by non-asyncio threads such as in the example below. 
    # Most importantly, "safe" here means the callable is run in the context
    # of the event loop (main loop below) thread, which means it can access any
    # event loop items that woudl be unsafe to access from a different thread.
    SHOW_MAIN_HANG = False
    
    def print_thread_info_msg(msg):
        print(
            f"{msg}: "
            f"name={threading.current_thread().name} "
            f"native_id={threading.current_thread().native_id}"
        )
    
    def set_the_event(event: asyncio.Event):
        print_thread_info_msg("Setting the event from callable")
        event.set()
    
    def other_thread_func(
        main_loop: asyncio.AbstractEventLoop,
        main_loop_event: asyncio.Event
    ):
        # For example, this thread is doing other work.
        # This thread's work is being performed while the
        # main loop is both running tasks but also when the
        # main loop might be doing nothing, waiting. This
        # thread cannot predict if the main loop is running
        # or waiting, nor should it have to conern itself
        # with such details. Nevertheless, this thread wishes
        # to signal the main loop when something happens, perhaps
        # when there's work ready, or maybe to indicate the
        # work is completed, the program should end. There
        # many examples to choose from... this is just one
        # abstract hypothetical.
        print_thread_info_msg("other_thread_func: doing some other work")
        time.sleep(3)
        if SHOW_MAIN_HANG:
            # This way sets the event but does not wake the main loop
            # which means the event can be set but the main loop sits
            # waiting, not checking the vent. This will cause main loop
            # to hang, not knowing the event has been signaled.
            print(f"other_thread_func: set (unsafe way)")
            main_loop_event.set()
        else:
            # This way sets the event and wakes the main loop, which
            # avoids the hang.
            print(f"other_thread_func: set (safe way)")
            main_loop.call_soon_threadsafe(set_the_event, main_loop_event)
    
    async def main():
        print_thread_info_msg("Main event loop thread")
        main_loop = asyncio.get_running_loop()
        main_loop_event = asyncio.Event() 
        thd = threading.Thread(target=other_thread_func, args=(main_loop, main_loop_event,))
        thd.start()
        print(f"main: wait")
        the_task = asyncio.create_task(main_loop_event.wait())
        await asyncio.wait([the_task])
        print(f"main: wait completed")
    
    asyncio.run(main())
    print("program exit")
    

    Example with SHOW_MAIN_HANG=False...

    Main event loop thread: name=MainThread native_id=2924
    main: wait
    other_thread_func: doing some other work: name=Thread-7 (other_thread_func) native_id=8156
    other_thread_func: set (safe way)
    Setting the event from callable: name=MainThread native_id=2924
    main: wait completed
    program exit
    

    Example with SHOW_MAIN_HANG=True...

    Main event loop thread: name=MainThread native_id=19240
    main: wait
    other_thread_func: doing some other work: name=Thread-7 (other_thread_func) native_id=21088
    other_thread_func: set (unsafe way)
        (hangs here, does not exit)
    

    As you observed, a primary difference is the safe variation calls _write_to_self. The _write_to_self method will wake the event loop thread. If that did not happen, the work could be queued but not processed by the event loop thread as can be seen in the example above when SHOW_MAIN_HANG=True.

    From CPython ...\Lib\asyncio\base_events.py

    ...
        def call_soon_threadsafe(self, callback, *args, context=None):
            """Like call_soon(), but thread-safe."""
            self._check_closed()
            if self._debug:
                self._check_callback(callback, 'call_soon_threadsafe')
            handle = self._call_soon(callback, args, context)
            if handle._source_traceback:
                del handle._source_traceback[-1]
            self._write_to_self()
            return handle
    ...