So I have a heavy time-consuming function call my_heavy_function
and I need to redirect that output to a web interface that is calling it, I have a method to send messages to the web interface, let's called that method async push_message_to_user()
.
basically, it's something like
import time
def my_heavy_function():
time_on = 0
for i in range(20):
time.sleep(1)
print(f'time spend {time_on}')
time_on = time_on+1
async def push_message_to_user(message:str):
# some lib implementation
pass
if __name__ == "__main__":
my_heavy_function() # how push prints to the UI ?
maybe there is a way giving my_heavy_function(stdout_obj)
and use that "std_object"(StringIO) to do something like stdout_object.push(f'time spend {time_on}')
. I can do that, but what I can't change the my_heavy_function()
by an async version, to add push_message_to_user()
directly instead of the print
(it's used by other non-ascyn routines)
what I would want it's something like (pseudocode)
with contextlib.redirect_output(my_prints):
my_heavy_function()
while my_prints.readable():
# realtime push
await push_message_to_user(my_prints)
Thanks!
Thanks for the comment of @HTF
I finally managed to solve the problem with janus. I copy the example of the repo, and I modified in order to receive a variable number of messages (because I don't know how many iterations my_heavy_function()
will use)
import asyncio
import janus
import time
def my_heavy_function(sync_q):
for i in range(10):
sync_q.put(i)
time.sleep(1)
sync_q.put('end') # is there a more elegant way to do this ?
sync_q.join()
async def async_coro(async_q):
while True:
val = await async_q.get()
print(f'arrived {val}')
# send val to UI
# await push_message_to_user(val)
async_q.task_done()
if val == 'end':
break
async def main():
queue = janus.Queue()
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, my_heavy_function, queue.sync_q)
await async_coro(queue.async_q)
await fut
queue.close()
await queue.wait_closed()
asyncio.run(main())