In the main()
program there is an intentional error which should throw an exception (in this case ZeroDivisionError
) but it's not. Yet there is nothing there to catch it. Instead it's cancelling some of the pending tasks and the program keeps running. If you run the code you will see that with the intentional error included the server takes two attempts to run (since the queued tasks are cancelled by the exception) but with the error commented out it will fire up on the first attempt. Shouldn't this error bring my program to a screeching halt?
async def main():
# logging
loggerTask = asyncio.create_task(initLogging(fileName=ospath.join('log','servers.log')))
await asyncio.sleep(0)
logger.info('servers program running')
tasks = set()
# socket server
socketServer = SocketServer(None)
tasks.add(socketServer.superviseTask)
result = 1/0 # <----- this should cause an exception but it's not
done, pending = await asyncio.wait(tasks)
Full code:
import os.path as ospath
import asyncio
from socket import gethostbyname, gethostname
import logging
from logging.handlers import QueueHandler, QueueListener
from queue import Queue
logger = logging.getLogger()
async def initLogging(format='%(levelname)8s - %(asctime)s - %(name)s - %(msg)s', consoleLevel=logging.INFO, fileLevel=logging.DEBUG, fileName='log.log'):
# Change root logger level from WARNING (default) to NOTSET in order for all messages to be delegated.
logger = logging.getLogger()
logger.setLevel(logging.NOTSET)
# Add file handler
fileHandler = logging.FileHandler(filename=fileName, mode='w')
fileHandler.setLevel(fileLevel)
fileHandler.setFormatter(logging.Formatter(format))
logger.addHandler(fileHandler)
# Add queue handler
queue = Queue()
queueHandler = QueueHandler(queue)
queueHandler.setLevel(logging.INFO)
queueHandler.setFormatter(logging.Formatter(format))
logger.addHandler(queueHandler)
listener = QueueListener(queue, logging.StreamHandler())
try:
listener.start()
logger.info('logger started')
while True:
await asyncio.sleep(10)
finally:
logger.info('logger finished')
listener.stop
# async server base class
class AsyncServer():
def __init__(self, app, name:str, type:str, maxRetryAttempts:int=-1, retryDelay:int=5, autoStart:bool=True) -> None:
self.app = app
self.name = name
self.type = type
self.retryCount = 0
self.maxRetryAttempts = maxRetryAttempts # default -1 = unlimited
self.retryDelay = retryDelay # default = 5 seconds
if autoStart:
self.start()
def log(self, msg):
return f'{self.type} {self.name} - {msg}'
async def awaitCoro(self):
while True:
self.retryCount += 1
logger.info(self.log(f'run task starting attempt {self.retryCount}/{self.maxRetryAttempts}'))
try:
self.runTask = asyncio.create_task(self.runCoro())
except Exception as err:
logger.error(self.log(f'error creating run task - {err}'))
else:
try:
await self.runTask
# self.retryCount = 0
except asyncio.exceptions.CancelledError as err:
logger.info(self.log('run task cancelled'))
except Exception as err:
logger.error(self.log(f'error - {err}'))
# if self.runTask.done():
# break
if (self.maxRetryAttempts > 0) and (self.retryCount >= self.maxRetryAttempts):
logger.warning(self.log('max retry attempts reached'))
break
logger.info(self.log(f'attempting restart in {self.retryDelay} seconds'))
await asyncio.sleep(self.retryDelay)
logger.info(self.log('supervise task cancelled'))
def start(self):
logger.info(self.log('starting'))
self.superviseTask = asyncio.create_task(self.awaitCoro())
# self.app.taskRegistry.addItem(self.name, self.superviseTask)
def shutdown(self):
logger.info(self.log('shutting down'))
self.superviseTask.cancel()
self.runTask.cancel()
def restart(self):
logger.info(self.log('restarting'))
self.shutdown()
self.start()
# socket server and client
class SocketBase():
def __init__(self, hostname:str='', ipaddr:str='', port:int=32843) -> None:
if hostname:
self.hostname = hostname
self.ipaddr = gethostbyname(hostname)
elif ipaddr:
self.hostname = ipaddr
self.ipaddr = ipaddr
else:
self.hostname = gethostname()
self.ipaddr = gethostbyname(self.hostname)
self.port = port
class SocketServer(SocketBase, AsyncServer):
def __init__(self, app, hostname: str = '', ipaddr: str = '', port: int = 32843, maxRetryAttempts:int=-1, retryDelay:int=5) -> None:
SocketBase.__init__(self, hostname, ipaddr, port)
AsyncServer.__init__(self, app, self.hostname, 'socket server', maxRetryAttempts=maxRetryAttempts, retryDelay=retryDelay)
self.app = app
async def runCoro(self) -> None:
try:
self.server = await asyncio.start_server(self.handler, self.ipaddr, self.port)
logger.info(self.log(f'listening on {self.ipaddr}:{self.port}'))
async with self.server:
await self.server.serve_forever()
except Exception as err:
logger.error(self.log(f'error starting server - {err}'))
async def handler(self, reader:asyncio.StreamReader, writer:asyncio.StreamWriter):
logger.info(self.log('receiving message'))
async def main():
# logging
loggerTask = asyncio.create_task(initLogging(fileName=ospath.join('log','servers.log')))
await asyncio.sleep(0)
logger.info('servers program running')
tasks = set()
# socket server
socketServer = SocketServer(None)
tasks.add(socketServer.superviseTask)
result = 1/0
done, pending = await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
Python 3.12.3
Of course the 1/0
immediately terminates the main
. Almost everything you see happens during the asyncio cleanup. All tasks existing at that moment are cancelled and awaited, but your code does not react to the CancelledError as it should. See here:
try:
await self.runTask
# self.retryCount = 0
except asyncio.exceptions.CancelledError as err:
logger.info(self.log('run task cancelled'))
# --- added comment: missing raise here --
except Exception as err:
This fragment "swallows" the asyncio.CancelledError
and carries on as nothing had happened. This violates the "contract" and halts the cleanup.
This exception can be caught to perform custom operations when asyncio Tasks are cancelled. In almost all situations the exception must be re-raised.
Try again with the raise
added at the place marked by the comment.