Let's say for example:
This is one nice event listening module! But this uses thread, and there happened to be no async alternatives!
And one decided to mix with their long-time favorite Async API, Trio.
One could come up with idea of using Trio's Memory channels, remembering how asyncio
was able to perform this.
"""
Watchdog + asyncio MRE
"""
import pathlib
import asyncio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent
class SomeFileCreationDetector(FileSystemEventHandler):
def __init__(self, queue: asyncio.Queue):
self._non_threadsafe_queue = queue
self._loop = asyncio.get_running_loop()
def put_safe(self, val):
"""Try putting in, if it doesn't, drop value to relieve backpressure"""
try:
self._non_threadsafe_queue.put_nowait(val)
except asyncio.QueueFull:
pass
def on_created(self, event: FileSystemEvent) -> None:
self._loop.call_soon_threadsafe(
self.put_safe,
event.src_path
)
async def main():
observer = Observer()
actual_cwd = pathlib.Path(__file__).parent.absolute()
# create queue
queue = asyncio.Queue(1024)
# prepare handler & schedule for file system event
handler = SomeFileCreationDetector(queue)
observer.schedule(handler, str(actual_cwd), recursive=False)
try:
observer.start()
while str_path := await queue.get():
# do some stuff with path...
print(str_path)
finally:
# make sure we stop the observer
observer.stop()
observer.join()
if __name__ == '__main__':
asyncio.run(main())
Which indeed works well.
X:\hina.png
X:\black.png
...
However, with similar attempt of that:
"""
Watchdog + Trio Minimum reproducible example
for listening file creation event
"""
import pathlib
import trio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent
class SomeFileCreationDetector(FileSystemEventHandler):
def __init__(self, send_ch: trio.MemorySendChannel):
self.send_ch = send_ch
def on_created(self, event: FileSystemEvent) -> None:
self.send_ch.send_nowait(event.src_path)
async def main():
observer = Observer()
actual_cwd = pathlib.Path(__file__).parent.absolute()
# create channel
send_ch, recv_ch = trio.open_memory_channel(1024)
async with recv_ch, send_ch:
# prepare handler & schedule for file system event
handler = SomeFileCreationDetector(send_ch)
observer.schedule(handler, str(actual_cwd), recursive=False)
try:
observer.start()
async for str_path in recv_ch:
# do some stuff with path...
print(str_path)
finally:
# make sure we stop the observer
observer.stop()
observer.join()
if __name__ == '__main__':
trio.run(main)
...is seemingly impossible, due to that other thread not being the trio's thread.
...
RuntimeError: this thread wasn't created by Trio, pass kwarg trio_token=...
nor that thread spawned outside async context isn't working anyway (when ignoring non-thread-safe nature when without trio's thread) which is the case for threading-based event listener libraries out there.
"""
Trio's behavior comparison between threads spawn location
Demonstration of non-async context spawned thread failing to call trio's methods
"""
import time
import threading
import trio
def trio_thread(send_ch: trio.MemorySendChannel):
trio.from_thread.run_sync(send_ch.send_nowait, "Trio Thread: Yay!")
def thread_threadsafe_ignored(called_from: str):
# delay execution until channel is ready
while SomeNameSpace.send_ch is None:
time.sleep(0.1)
try:
SomeNameSpace.send_ch.send_nowait(f"Non-Trio Thread in {called_from} context: Yay!")
except Exception:
print(f"Non-Trio Thread in {called_from} context: ded")
raise
class SomeNameSpace:
send_ch = None
# start thread outside async context
thread_spawned_in_sync_context = threading.Thread(target=thread_threadsafe_ignored, args=["non-async"])
thread_spawned_in_sync_context.start()
async def async_context():
send_ch, recv_ch = trio.open_memory_channel(1024)
SomeNameSpace.send_ch = send_ch
async with send_ch, send_ch, trio.open_nursery() as nursery:
# schedule trio thread
nursery.start_soon(trio.to_thread.run_sync, trio_thread, send_ch)
# start thread inside async context
thread_spawned_in_async_context = threading.Thread(target=thread_threadsafe_ignored, args=["async"])
thread_spawned_in_async_context.start()
async for val in recv_ch:
print(val)
trio.run(async_context)
Non-Trio Thread in async context: Yay!
Trio Thread: Yay!
Non-Trio Thread in non-async context: ded
Exception in thread Thread-1 (thread_threadsafe_ignored):
Traceback (most recent call last):
...
File "C:\Users\jupiterbjy\AppData\Local\Programs\Python\Python312\Lib\site-packages\trio\_core\_generated_run.py", line 126, in reschedule
raise RuntimeError("must be called from async context") from None
RuntimeError: must be called from async context
Surely this can be solved by using thread-safe queue.Queue
and polling until it's not empty, but for curiosity sake I am interested in what could be the most Trio-like solution for such problem.
Complete code of trio implementation applying Arthur Tacca's detailed answer below, for those who might need a complete working toy code.
(merely few line change)
"""
Watchdog + Trio toy code for listening file creation event
"""
import pathlib
import trio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent
class SomeFileCreationDetector(FileSystemEventHandler):
def __init__(self, send_ch: trio.MemorySendChannel):
self.send_ch = send_ch
self.trio_token = trio.lowlevel.current_trio_token()
def on_created(self, event: FileSystemEvent) -> None:
self.trio_token.run_sync_soon(self.send_ch.send_nowait, event.src_path)
async def main():
observer = Observer()
actual_cwd = pathlib.Path(__file__).parent.absolute()
# create channel
send_ch, recv_ch = trio.open_memory_channel(1024)
async with recv_ch, send_ch:
# prepare handler & schedule for file system event
handler = SomeFileCreationDetector(send_ch)
observer.schedule(handler, str(actual_cwd), recursive=False)
try:
observer.start()
print("Observer started!")
async for str_path in recv_ch:
# do some stuff with path...
print(str_path)
except KeyboardInterrupt:
print("Terminating!")
finally:
# make sure we stop the observer
observer.stop()
observer.join()
print("Observer thread stopped!")
if __name__ == '__main__':
trio.run(main)
❯ py X:\scratch.py
Observer started!
X:\hina.png
X:\Horizon Claire cover.png
Terminating!
Observer thread stopped!
The clue to making trio.from_thread.run()
work is in the error message that you posted: "RuntimeError: this thread wasn't created by Trio, pass kwarg trio_token=...". You if you follow the breadcrumb trail by searching the Trio docs for trio_token
(actually the class is TrioToken
) you can make that function work. The docs for trio.from_thread.run()
include a section "Locating a TrioToken" though annoyingly it doesn't point you directly at the trio.lowlevel.current_trio_token()
function you need.
It's actually simpler to use trio.TrioToken.run_sync_soon()
directly (which is what trio.from_thread.run()
uses under the hood) since the function MemorySendChannel.send_nowait()
is sync anyway. Here's a simple example that puts the pieces together:
class MySimpleWorker:
def __init__(self, *worker_args):
self._worker_args = worker_args
self._trio_token = trio.lowlevel.current_trio_token()
self._send_channel, self.receive_channel = trio.open_memory_channel(max_buffer_size=math.inf)
self._thread = threading.Thread(target=self._thread_main)
self._thread.start()
def _thread_main(self):
worker_object = ThreadedWorker(*self._worker_args)
while True:
# TODO (if needed): exception handling
next_val = worker_object.get_next()
self._trio_token.run_sync_soon(self._send_channel.send_nowait, next_val)
self._trio_token.run_sync_soon(self._send_channel.close)
async def use_simple_worker():
async with trio.open_nursery() as n:
# Can use n.start_soon to start unrelated tasks in the background
w = MySimpleWorker()
async for data in w.receive_channel:
process(data)
In a real application you probably would also want to shutdown the object in the worker thread when the async application is shutdown, which is slightly fiddlier. You need to spawn another task in the nursery which shutdowns the thread when it is cancelled and waits for that to complete. Since it's not possible to wait until a channel is shut down without waiting for values in it, I've used a separate trio.Event
to signal shutdown completion. Although not strictly necessary, it also makes sense to spawn the thread in this new routine. Here's what that would look like:
class MyWorker:
def __init__(self, *worker_args):
self._trio_token = trio.lowlevel.current_trio_token()
self._stopped_event = trio.Event()
self._worker_object = ThreadedWorker(*worker_args)
self._send_channel, self.receive_channel = trio.open_memory_channel(max_buffer_size=math.inf)
self._thread = None
async def run_background_thread(self):
"""Ensures that the background thread is closed when nursery is shutdown."""
assert not self._thread is None
self._thread = threading.Thread(target=self._thread_main)
self._thread.start()
try:
await trio.sleep_forever()
finally:
self._worker_object.stop() # Needs to be thread safe stop method
with trio.CancelScope(shield=True):
await self._stopped_event.wait()
self._thread.join()
def _thread_main(self):
while True:
# TODO (if needed): exception handling (could capture and inject into run_background_thread)
next_val = self._worker_object.get_next()
if next_val is None: # Or however else the object signifies shutdown
self._trio_token.run_sync_soon(self._send_channel.close)
self._trio_token.run_sync_soon(self._stopped_event.set)
else:
self._trio_token.run_sync_soon(self._send_channel.send_nowait, next_val)
async def use_worker():
async with trio.open_nursery() as n:
# Can use n.start_soon to start unrelated tasks in the background
w = MyWorker()
n.start_soon(w.run_background_thread)
async for data in w.receive_channel:
process(data)