I have a long running task which is executing a coroutine that calls asyncio.sleep() with a large value.
In another task I need to know how much longer the long task will sleep. I can get the long task's Task object from asyncio.all_tasks(). How can I use this Task object to determine how much longer it will sleep?
In order to get that information, you can access internal data structures. The following works at least in Python versions from 3.9 to 3.14.
Big warning: it relies on behavior that is not documented and may change between releases. Absolutely not recommended.
import asyncio
async def sleeping_task():
await asyncio.sleep(10)
def get_sleep(task):
func = asyncio.futures._set_result_unless_cancelled
fut = task._fut_waiter
loop = asyncio.get_running_loop()
try:
handle = next(iter(
h for h in loop._scheduled if isinstance(h, asyncio.TimerHandle)
and h._callback is func and h._args[0] is fut
))
except StopIteration:
print("not found")
else:
print(f"sleep ends in {handle._when - loop.time():.1f} seconds")
async def main():
task = asyncio.create_task(sleeping_task())
await asyncio.sleep(0.5)
get_sleep(task)
task.cancel()
if __name__ == "__main__":
asyncio.run(main())