import trio
work_available = trio.Event()
async def get_work():
while True:
work = check_for_work()
if not work:
await work_available.wait()
else:
return work
def add_work_to_pile(...):
...
if work_available.statistics().tasks_waiting:
global work_available
work_available.set()
work_available = trio.Event()
In this Python-like code example I get work in bursts via add_work_to_pile()
. The workers which get work via get_work()
are slow. So most of the time add_work_to_pile()
is called there will be no one waiting on work_available
.
Which is better/cleaner/simpler/more pythonic/more trionic/more intended by the trio developers?
Event()
via statistics().tasks_waiting
, like in the example code, ...or...set()
setting the Event()
and creating a new one each time? (Most of them in vain.)Furthermore... the API does not really seem to expect regular code to check if someone is waiting via this statistics()
call...
I don’t mind spending a couple more lines to make things clearer. But that goes both ways: a couple CPU cycles more are fine for simpler code...
Creating a new Event
is roughly the same cost as creating the _EventStatistics
object within the statistics
method. You'll need to profile your own code to pick out any small difference in performance. However, although it is safe and performant, the intent of statistics
across trio's classes is for debug rather than core logic. Using/discarding many Event
instances would be relatively more along the intent of the devs.
A more trionic pattern would be to load each work item into a buffered memory channel in place of your add_work_to_pile()
method and then iterate on that in the task that awaits get_work
. I feel the amount of code is comparable to your example:
import trio
send_chan, recv_chan = trio.open_memory_channel(float('inf'))
async def task_that_uses_work_items():
# # compare
# while True:
# work = await get_work()
# handle_work(work)
async for work in recv_chan:
handle_work(work)
def add_work_to_pile():
...
for work in new_work_set:
send_chan.send_nowait(work)
# maybe your work is coming in from a thread?
def add_work_from_thread():
...
for work in new_work_set:
trio_token.run_sync_soon(send_chan.send_nowait, work)
Furthermore, it's performant because the work items are efficiently rotated through a deque
internally. This code would checkpoint for every work item so you may have to jump through some hoops if you want to avoid that.