exceptionpython-asynciopython-3.12

exception is cancelling tasks in asyncio program - why?


In the main() program there is an intentional error which should throw an exception (in this case ZeroDivisionError) but it's not. Yet there is nothing there to catch it. Instead it's cancelling some of the pending tasks and the program keeps running. If you run the code you will see that with the intentional error included the server takes two attempts to run (since the queued tasks are cancelled by the exception) but with the error commented out it will fire up on the first attempt. Shouldn't this error bring my program to a screeching halt?

async def main():
    # logging
    loggerTask = asyncio.create_task(initLogging(fileName=ospath.join('log','servers.log')))
    await asyncio.sleep(0)
    logger.info('servers program running')

    tasks = set()

    # socket server
    socketServer = SocketServer(None)
    tasks.add(socketServer.superviseTask)

    result = 1/0    # <----- this should cause an exception but it's not

    done, pending = await asyncio.wait(tasks)

Full code:

import os.path as ospath
import asyncio
from socket import gethostbyname, gethostname
import logging
from logging.handlers import QueueHandler, QueueListener
from queue import Queue

logger = logging.getLogger()

async def initLogging(format='%(levelname)8s - %(asctime)s - %(name)s - %(msg)s', consoleLevel=logging.INFO, fileLevel=logging.DEBUG, fileName='log.log'):
    # Change root logger level from WARNING (default) to NOTSET in order for all messages to be delegated.
    logger = logging.getLogger()
    logger.setLevel(logging.NOTSET)

    # Add file handler
    fileHandler = logging.FileHandler(filename=fileName, mode='w')
    fileHandler.setLevel(fileLevel)
    fileHandler.setFormatter(logging.Formatter(format))
    logger.addHandler(fileHandler)

    # Add queue handler
    queue = Queue()
    queueHandler = QueueHandler(queue)
    queueHandler.setLevel(logging.INFO)
    queueHandler.setFormatter(logging.Formatter(format))
    logger.addHandler(queueHandler)
    listener = QueueListener(queue, logging.StreamHandler())
    try:
        listener.start()
        logger.info('logger started')
        while True:
            await asyncio.sleep(10)
    finally:
        logger.info('logger finished')
        listener.stop

# async server base class

class AsyncServer():
    def __init__(self, app, name:str, type:str, maxRetryAttempts:int=-1, retryDelay:int=5, autoStart:bool=True) -> None:
        self.app = app
        self.name = name
        self.type = type
        self.retryCount = 0
        self.maxRetryAttempts = maxRetryAttempts # default -1 = unlimited
        self.retryDelay = retryDelay # default = 5 seconds
        if autoStart:
            self.start()

    def log(self, msg):
        return f'{self.type} {self.name} - {msg}'

    async def awaitCoro(self):
        while True:
            self.retryCount += 1
            logger.info(self.log(f'run task starting attempt {self.retryCount}/{self.maxRetryAttempts}'))
            try:
                self.runTask = asyncio.create_task(self.runCoro())
            except Exception as err:
                logger.error(self.log(f'error creating run task - {err}'))
            else:
                try:
                    await self.runTask
                    # self.retryCount = 0
                except asyncio.exceptions.CancelledError as err:
                    logger.info(self.log('run task cancelled'))
                except Exception as err:
                    logger.error(self.log(f'error - {err}'))
            # if self.runTask.done():
            #   break
            if (self.maxRetryAttempts > 0) and (self.retryCount >= self.maxRetryAttempts):
                logger.warning(self.log('max retry attempts reached'))
                break
            logger.info(self.log(f'attempting restart in {self.retryDelay} seconds'))
            await asyncio.sleep(self.retryDelay)
        logger.info(self.log('supervise task cancelled'))

    def start(self):
        logger.info(self.log('starting'))
        self.superviseTask = asyncio.create_task(self.awaitCoro())
        # self.app.taskRegistry.addItem(self.name, self.superviseTask)

    def shutdown(self):
        logger.info(self.log('shutting down'))
        self.superviseTask.cancel()
        self.runTask.cancel()
    
    def restart(self):
        logger.info(self.log('restarting'))
        self.shutdown()
        self.start()

# socket server and client

class SocketBase():
    def __init__(self, hostname:str='', ipaddr:str='', port:int=32843) -> None:
        if hostname:
            self.hostname = hostname
            self.ipaddr = gethostbyname(hostname)
        elif ipaddr:
            self.hostname = ipaddr
            self.ipaddr = ipaddr
        else:
            self.hostname = gethostname()
            self.ipaddr = gethostbyname(self.hostname)
        self.port = port

class SocketServer(SocketBase, AsyncServer):
    def __init__(self, app, hostname: str = '', ipaddr: str = '', port: int = 32843, maxRetryAttempts:int=-1, retryDelay:int=5) -> None:
        SocketBase.__init__(self, hostname, ipaddr, port)
        AsyncServer.__init__(self, app, self.hostname, 'socket server', maxRetryAttempts=maxRetryAttempts, retryDelay=retryDelay)
        self.app = app
        
    async def runCoro(self) -> None:
        try:
            self.server = await asyncio.start_server(self.handler, self.ipaddr, self.port)
            logger.info(self.log(f'listening on {self.ipaddr}:{self.port}'))
            async with self.server:
                await self.server.serve_forever()
        except Exception as err:
            logger.error(self.log(f'error starting server - {err}'))

    async def handler(self, reader:asyncio.StreamReader, writer:asyncio.StreamWriter):
        logger.info(self.log('receiving message'))

async def main():
    # logging
    loggerTask = asyncio.create_task(initLogging(fileName=ospath.join('log','servers.log')))
    await asyncio.sleep(0)
    logger.info('servers program running')

    tasks = set()
    
    # socket server
    socketServer = SocketServer(None)
    tasks.add(socketServer.superviseTask)

    result = 1/0

    done, pending = await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())

Python 3.12.3


Solution

  • Of course the 1/0 immediately terminates the main. Almost everything you see happens during the asyncio cleanup. All tasks existing at that moment are cancelled and awaited, but your code does not react to the CancelledError as it should. See here:

                try:
                    await self.runTask
                    # self.retryCount = 0
                except asyncio.exceptions.CancelledError as err:
                    logger.info(self.log('run task cancelled'))
                    # --- added comment: missing raise here --
                except Exception as err:
    

    This fragment "swallows" the asyncio.CancelledError and carries on as nothing had happened. This violates the "contract" and halts the cleanup.

    This exception can be caught to perform custom operations when asyncio Tasks are cancelled. In almost all situations the exception must be re-raised.

    Try again with the raise added at the place marked by the comment.