I am using an API client supplied by a vendor (Okta) that has very poor/old examples of running with async - for example (the Python documentation says not to use get_event_loop()
):
from okta.client import Client as OktaClient
import asyncio
async def main():
client = OktaClient()
users, resp, err = await client.list_users()
while True:
for user in users:
print(user.profile.login) # Add more properties here.
if resp.has_next():
users, err = await resp.next()
else:
break
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
This works, but I need to go through the returned results and follow various links to get additional information. I created a queue using asyncio
and I have the worker loop until the queue is empty. This also works.
I start running into issues when I try to have more than one worker - if the code throws an exception, the workers never return.
async def handle_queue(name, queue: asyncio.Queue, okta_client: OktaClient):
"""Handle queued API requests"""
while True:
log.info("Queue size: %d", queue.qsize())
api_req = await queue.get()
log.info('Worker %s is handling %s', name, api_req)
api_func = getattr(okta_client, f"list_{api_req['endpoint']}")
api_procs = getattr(sys.modules[__name__], api_req['processor'])
log.info('Worker %s is handling %s with api_func %s, api_proc %s', name, api_req, api_func, api_proc)
resp_data, resp, err = await api_func(**api_req['params'])
log.debug(resp_data)
while True:
for i in resp_data:
await api_proc(i, queue)
if resp.has_next():
resp_data, err = await resp.next()
else:
break
queue.task_done()
async def create_workers(queue: asyncio.Queue):
"""Reusable worker creation process"""
log.info('Creating workers')
workers = []
async with OktaClient() as okta_client:
for i in range(NUM_WORKERS):
log.info('Creating worker-%d', i)
worker = asyncio.create_task(handle_queue(f'worker-{i}', queue, okta_client))
workers.append(worker)
await queue.join()
for worker in workers:
worker.cancel()
await asyncio.gather(*workers, return_exceptions=True)
async def main():
"""Load Access Policies and their mappings and rules"""
queue = asyncio.Queue()
queue.put_nowait({'endpoint': 'policies', 'params': {'query_params': {'type': 'ACCESS_POLICY'}}, 'processor': 'process_policy'})
await create_workers(queue)
metadata['policy_count'] = len(data)
print(yaml.dump({'_metadata': metadata, 'data': data}))
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
# Hide the exception for a Ctrl-C
log.info('Keyboard Interrupt')
If an exception is thrown in handle_queue
(or any of the functions it calls), the program hangs. When I hit Ctrl-C, I get the exception along with a message asyncio task exception was never retrieved
. I understand this is because queue.join()
is waiting for queue.task_done()
to be called as many times as queue.put()
was called, but I don't understand why the exception isn't caught.
I tried wrapping the work in handle_queue
in a try
:
async def handle_queue(name, queue: asyncio.Queue, okta_client: OktaClient):
"""Handle queued API requests"""
while True:
try:
# REST OF THE FUNCTION
except Exception as e:
queue.task_done()
raise e
queue.task_done()
This way, the program execution does finish, but the exception still disappears. How can I capture the exception and still allow the program to finish?
For printing the error, pythons traceback module is especially helpful. Add import traceback
to your imports and then use it like so:
async def handle_queue(name, queue: asyncio.Queue, okta_client: OktaClient):
"""Handle queued API requests"""
while True:
try:
# REST OF THE FUNCTION
except Exception as e:
print(repr(e))
print(traceback.format_exc())
queue.task_done()
This will go to stdout instead of stderr, but should show like a standard python stack trace. If you really want it to go to stderr (e.g. for logging purposes), you can replace that print with print(traceback.format_exc(), file=sys.stderr)
as well as importing sys.
It is worthwhile to note that running asyncio.gather
with return_exceptions=True will actually not raise the exception (this may be your intended behavior). If you wish to keep running the program after an exception outside of the try loop occurs, then it should likely stay that way, though also note the exception will be returned as a result and should be handled if that happens. In general, async handles errors differently and they may go unnoticed if you do not explicitly handle them