I want to exit from an existing gen.sleep state. USer will send a HTTP POST with a param to trigger this method.
I use a Tornado loop to start a process which refresh some authentication/tokens every 30 minutes. In some situations user revoke access but app is still in sleep state hence I want to restart workflow.
Example:
async def _loop(self):
"""Refresh logic."""
while self.state() == "ACTIVE":
if self.auth_enabled():
try:
password = await _get_metadata()
if password:
_store_password(password)
except Exception as e
logger.exception(e)
await gen.sleep(retry_secs)
else:
logger.info("sleeping...")
# Break pause here.
await gen.sleep(SLEEP_SECS)
Class
class RefreshPassword:
def __init__(self):
self._event = locks.Event()
def start(self):
if self.state() != "ACTIVE":
self._state = "ACTIVE":
self._event.clear()
ioloop.IOLoop.current().add_callback(self._loop)
return
def stop(self):
if self.state() == "ACTIVE":
self._state = "STOPPED"
self._event.set()
class PasswordHandler(web.RequestHandler):
"""Handles HTTP requests"""
SUPPORTED_METHODS = ["GET", "POST"]
def init(self, calendar: RefreshPassword):
"""Initializes RefreshPassword calendar object.
Args:
calendar(RefreshPassword): Keeps Password updated.
"""
self._calendar = calendar
async def post(self):
if self._calendar.state() == "ACTIVE":
try:
restart = self.get_argument("restart", default="false")
if restart.lower() == "true":
logger.info("Stopping refresh process")
self._calendar.stop()
else:
logger.warning(msg)
self.set_status(409)
self.write(msg)
self.finish()
return
except web.MissingArgumentError as exc:
logger.exception(exc)
# Initialize workflow
self._calendar.start()
self.set_status(201)
self.write(msg)
self.finish()
I have tried the following, but tornado gets blocked on sleep.
async def _wait(self):
done, _ = await asyncio.wait(
[gen.sleep(SLEEP_SECS), self._event],
return_when=asyncio.FIRST_COMPLETED,
)
logger.info("Wait done: %s", done)
if self._event.is_set():
logger.info("Wait interrupted")
self._event.clear()
else:
logger.info("Wait completed")
async def _refresh(self):
# Wait for the collection of any task to be completed.
# This allows restarting workflow when we are waiting for refresh.
# If self._event is completed, exit and interrupt sleep event.
ioloop.IOLoop.current().add_callback(self._wait)
That's not a good design. You need to queue up a callback in N seconds, which allows you to return to the main loop so it can continue processing. You can't do sleeps. You must return, just like in a GUI application.
You can use ioloop.IOLoop.current().add_timeout
to request a callback after a period of time.