python-3.xpython-trio

Mixing trio with Thread based event listeners


Let's say for example:

This is one nice event listening module! But this uses thread, and there happened to be no async alternatives!

And one decided to mix with their long-time favorite Async API, Trio.

One could come up with idea of using Trio's Memory channels, remembering how asyncio was able to perform this.

"""
Watchdog + asyncio MRE
"""

import pathlib

import asyncio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent


class SomeFileCreationDetector(FileSystemEventHandler):

    def __init__(self, queue: asyncio.Queue):
        self._non_threadsafe_queue = queue
        self._loop = asyncio.get_running_loop()

    def put_safe(self, val):
        """Try putting in, if it doesn't, drop value to relieve backpressure"""
        try:
            self._non_threadsafe_queue.put_nowait(val)
        except asyncio.QueueFull:
            pass

    def on_created(self, event: FileSystemEvent) -> None:
        self._loop.call_soon_threadsafe(
            self.put_safe,
            event.src_path
        )


async def main():
    observer = Observer()
    actual_cwd = pathlib.Path(__file__).parent.absolute()

    # create queue
    queue = asyncio.Queue(1024)

    # prepare handler & schedule for file system event
    handler = SomeFileCreationDetector(queue)
    observer.schedule(handler, str(actual_cwd), recursive=False)

    try:
        observer.start()

        while str_path := await queue.get():
            # do some stuff with path...
            print(str_path)
    finally:
        # make sure we stop the observer
        observer.stop()
        observer.join()


if __name__ == '__main__':
    asyncio.run(main())

Which indeed works well.

X:\hina.png
X:\black.png
...



However, with similar attempt of that:

"""
Watchdog + Trio Minimum reproducible example
for listening file creation event
"""

import pathlib

import trio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent


class SomeFileCreationDetector(FileSystemEventHandler):

    def __init__(self, send_ch: trio.MemorySendChannel):
        self.send_ch = send_ch

    def on_created(self, event: FileSystemEvent) -> None:
        self.send_ch.send_nowait(event.src_path)


async def main():
    observer = Observer()
    actual_cwd = pathlib.Path(__file__).parent.absolute()

    # create channel
    send_ch, recv_ch = trio.open_memory_channel(1024)

    async with recv_ch, send_ch:
        # prepare handler & schedule for file system event
        handler = SomeFileCreationDetector(send_ch)
        observer.schedule(handler, str(actual_cwd), recursive=False)

        try:
            observer.start()

            async for str_path in recv_ch:
                # do some stuff with path...
                print(str_path)
        finally:
            # make sure we stop the observer
            observer.stop()
            observer.join()


if __name__ == '__main__':
    trio.run(main)

...is seemingly impossible, due to that other thread not being the trio's thread.

...
RuntimeError: this thread wasn't created by Trio, pass kwarg trio_token=...

nor that thread spawned outside async context isn't working anyway (when ignoring non-thread-safe nature when without trio's thread) which is the case for threading-based event listener libraries out there.

"""
Trio's behavior comparison between threads spawn location

Demonstration of non-async context spawned thread failing to call trio's methods
"""


import time
import threading
import trio


def trio_thread(send_ch: trio.MemorySendChannel):
    trio.from_thread.run_sync(send_ch.send_nowait, "Trio Thread: Yay!")


def thread_threadsafe_ignored(called_from: str):
    # delay execution until channel is ready
    while SomeNameSpace.send_ch is None:
        time.sleep(0.1)

    try:
        SomeNameSpace.send_ch.send_nowait(f"Non-Trio Thread in {called_from} context: Yay!")
    except Exception:
        print(f"Non-Trio Thread in {called_from} context: ded")
        raise


class SomeNameSpace:
    send_ch = None


# start thread outside async context
thread_spawned_in_sync_context = threading.Thread(target=thread_threadsafe_ignored, args=["non-async"])
thread_spawned_in_sync_context.start()


async def async_context():
    send_ch, recv_ch = trio.open_memory_channel(1024)
    SomeNameSpace.send_ch = send_ch

    async with send_ch, send_ch, trio.open_nursery() as nursery:

        # schedule trio thread
        nursery.start_soon(trio.to_thread.run_sync, trio_thread, send_ch)

        # start thread inside async context
        thread_spawned_in_async_context = threading.Thread(target=thread_threadsafe_ignored, args=["async"])
        thread_spawned_in_async_context.start()

        async for val in recv_ch:
            print(val)


trio.run(async_context)
Non-Trio Thread in async context: Yay!
Trio Thread: Yay!
Non-Trio Thread in non-async context: ded
Exception in thread Thread-1 (thread_threadsafe_ignored):
Traceback (most recent call last):
...
  File "C:\Users\jupiterbjy\AppData\Local\Programs\Python\Python312\Lib\site-packages\trio\_core\_generated_run.py", line 126, in reschedule
    raise RuntimeError("must be called from async context") from None
RuntimeError: must be called from async context


Surely this can be solved by using thread-safe queue.Queue and polling until it's not empty, but for curiosity sake I am interested in what could be the most Trio-like solution for such problem.



Edit:

Complete code of trio implementation applying Arthur Tacca's detailed answer below, for those who might need a complete working toy code.

(merely few line change)

"""
Watchdog + Trio toy code for listening file creation event
"""

import pathlib

import trio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent


class SomeFileCreationDetector(FileSystemEventHandler):

    def __init__(self, send_ch: trio.MemorySendChannel):
        self.send_ch = send_ch
        self.trio_token = trio.lowlevel.current_trio_token()

    def on_created(self, event: FileSystemEvent) -> None:
        self.trio_token.run_sync_soon(self.send_ch.send_nowait, event.src_path)


async def main():
    observer = Observer()
    actual_cwd = pathlib.Path(__file__).parent.absolute()

    # create channel
    send_ch, recv_ch = trio.open_memory_channel(1024)

    async with recv_ch, send_ch:
        # prepare handler & schedule for file system event
        handler = SomeFileCreationDetector(send_ch)
        observer.schedule(handler, str(actual_cwd), recursive=False)

        try:
            observer.start()
            print("Observer started!")

            async for str_path in recv_ch:
                # do some stuff with path...
                print(str_path)

        except KeyboardInterrupt:
            print("Terminating!")

        finally:
            # make sure we stop the observer
            observer.stop()
            observer.join()
            print("Observer thread stopped!")


if __name__ == '__main__':
    trio.run(main)
❯ py X:\scratch.py
Observer started!
X:\hina.png
X:\Horizon Claire cover.png
Terminating!
Observer thread stopped!

Solution

  • The clue to making trio.from_thread.run() work is in the error message that you posted: "RuntimeError: this thread wasn't created by Trio, pass kwarg trio_token=...". You if you follow the breadcrumb trail by searching the Trio docs for trio_token (actually the class is TrioToken) you can make that function work. The docs for trio.from_thread.run() include a section "Locating a TrioToken" though annoyingly it doesn't point you directly at the trio.lowlevel.current_trio_token() function you need.

    It's actually simpler to use trio.TrioToken.run_sync_soon() directly (which is what trio.from_thread.run() uses under the hood) since the function MemorySendChannel.send_nowait() is sync anyway. Here's a simple example that puts the pieces together:

    class MySimpleWorker:
        def __init__(self, *worker_args):
            self._worker_args = worker_args
            self._trio_token = trio.lowlevel.current_trio_token()
            self._send_channel, self.receive_channel = trio.open_memory_channel(max_buffer_size=math.inf)
            self._thread = threading.Thread(target=self._thread_main)
            self._thread.start()
    
        def _thread_main(self):
            worker_object = ThreadedWorker(*self._worker_args)
            while True:
                # TODO (if needed): exception handling
                next_val = worker_object.get_next()
                self._trio_token.run_sync_soon(self._send_channel.send_nowait, next_val)
            self._trio_token.run_sync_soon(self._send_channel.close)
    
    
    async def use_simple_worker():
        async with trio.open_nursery() as n:
            # Can use n.start_soon to start unrelated tasks in the background
            w = MySimpleWorker()
            async for data in w.receive_channel:
                process(data)
    

    In a real application you probably would also want to shutdown the object in the worker thread when the async application is shutdown, which is slightly fiddlier. You need to spawn another task in the nursery which shutdowns the thread when it is cancelled and waits for that to complete. Since it's not possible to wait until a channel is shut down without waiting for values in it, I've used a separate trio.Event to signal shutdown completion. Although not strictly necessary, it also makes sense to spawn the thread in this new routine. Here's what that would look like:

    class MyWorker:
        def __init__(self, *worker_args):
            self._trio_token = trio.lowlevel.current_trio_token()
            self._stopped_event = trio.Event()
            self._worker_object = ThreadedWorker(*worker_args)
            self._send_channel, self.receive_channel = trio.open_memory_channel(max_buffer_size=math.inf)
            self._thread = None
    
        async def run_background_thread(self):
            """Ensures that the background thread is closed when nursery is shutdown."""
            assert not self._thread is None
            self._thread = threading.Thread(target=self._thread_main)
            self._thread.start()
            try:
                await trio.sleep_forever()
            finally:
                self._worker_object.stop()  # Needs to be thread safe stop method
                with trio.CancelScope(shield=True):
                    await self._stopped_event.wait()
                    self._thread.join()
    
        def _thread_main(self):
            while True:
                # TODO (if needed): exception handling (could capture and inject into run_background_thread)
                next_val = self._worker_object.get_next()
                if next_val is None: # Or however else the object signifies shutdown
                    self._trio_token.run_sync_soon(self._send_channel.close)
                    self._trio_token.run_sync_soon(self._stopped_event.set)
                else:
                    self._trio_token.run_sync_soon(self._send_channel.send_nowait, next_val)
    
    
    async def use_worker():
        async with trio.open_nursery() as n:
            # Can use n.start_soon to start unrelated tasks in the background
            w = MyWorker()
            n.start_soon(w.run_background_thread)
            async for data in w.receive_channel:
                process(data)