pythonasynchronouspython-asyncio

Gracefully terminate `asyncio` program in Python with a full queue


The Problem

I have a simplified example of an asynchronous program (Python 3.9) that is not working when exceptions are raised in futures, and I am looking for ways to gracefully terminate it.

In particular, when the number of failed requests exceeds the sum of the queue size and the number of workers, I end up in a situation where the queue is full (see COMMENT1 in the code snippet below), and I can no longer submit new requests, nor fail gracefully. I am looking for advice how to avoid this situation by raising the fact that an exception occurred and shutting the entire program down. In this specific case, I do not need to reraise the specific errors of worker as those are logged. Thank you in advance!

import asyncio

HEALTHY_REQUESTS = 10
MAX_WORKERS = 3
QUEUE_SIZE = 19
# REQUESTS >= HEALTHY_REQUESTS + (QUEUE_SIZE + MAX_WORKERS) + 1  ---> full queue, program stalls because queue is full
REQUESTS = HEALTHY_REQUESTS + (QUEUE_SIZE + MAX_WORKERS) + 2


async def task_producer(requests):

    print(f'Starting task producer with: {requests}')
    request_queue = asyncio.Queue(maxsize=QUEUE_SIZE)  # starting tasks

    print('Initializing futures of the queue')
    request_workers = [asyncio.ensure_future(request_consumer(request_queue)) for _ in range(MAX_WORKERS)]

    # Submit requests
    for req in requests:
        print(f'Putting request {req} into queue, {request_queue.qsize()}')
        # COMMENT1: we get stuck here if
        # REQUESTS = HEALTHY_REQUESTS + (QUEUE_SIZE + MAX_WORKERS) + 2 or more.
        # We get stuck later if there is one less,
        # but I think we can ignore this fortunate case where we could still do something
        await request_queue.put(req)

    # Wait for all requests to come back
    await request_queue.put(None)
    await asyncio.wait(request_workers)

    print('Getting results from the parser')
    return 0


async def request_consumer(request_queue):
    print('request consumer started')
    while True:
        request_info = await request_queue.get()
        await asyncio.sleep(0.5)
        if request_info is None:
            await request_queue.put(None)
            break
        elif request_info > HEALTHY_REQUESTS:
            raise RuntimeError(f'Breaking thing in make requests with request={request_info}')
        print(f'{request_info} - request consumer is finished with request')


if __name__ == '__main__':
    requests = list(range(REQUESTS))
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    producer = loop.create_task(task_producer(requests))
    producer_result = loop.run_until_complete(producer)
    loop.close()
    print(f'Output of producer: {producer_result}')

Solution Attempts

Attempt 1

Tried adding a check for exceptions raised in the worker futures, which in turn I put in the for loop and before putting a request (see COMMENT1). See the code snippet of the worker status check is presented below.

This works in the contrived example above in a sense, that it helps terminating the loop, but the program still gets stuck when putting None afterwards. In the actual program, this does not work the same. Somehow the queue gets filled before this check has a chance to return False.

def workers_are_healthy(request_workers):
    for worker in request_workers:
        try:
            exception = worker.exception()
        except asyncio.InvalidStateError:
            continue
        if isinstance(exception, Exception):
            return False
    return True

Attempt 2

Found a something that works, but I really do not like it... If I create an instance variable in the class where this code is packaged in, (effectively a global variable in the example above,) I can use it in a separate task to stop the program. In particular, I create another task, which then is passed into loop.run_until_complete. Inside of this task, I check every few seconds what the value of the global variable is. If an error were to occur inside of a specific worker (request_workers), I change its value, and hope that this works. Feels very wrong...


Solution

  • With a little refactoring we can create a main task that monitors the completion of all tasks and gets notified as soon as any task ends with an exception.

    The main task creates the consumer and producer tasks. It then issues a call to asyncio.wait against the tasks specifying return_when=asyncio.FIRST_EXCEPTION so that it returns as soon as any task completes with an exception. wait returns a "done" and "pending" set. The "done" set is iterated to detect which task, if any, completed with an exception. The "pending" set is then iterated so that any pending tasks are canceled. If all tasks completed normally, then processing is continued by calling get_parser_results.

    import asyncio
    
    HEALTHY_REQUESTS = 10
    MAX_WORKERS = 3
    QUEUE_SIZE = 19
    REQUESTS = 30
    
    async def task_producer(request_queue, requests):
    
        print(f'Starting task producer with: {requests}')
    
        # Submit requests
        for req in requests:
            print(f'Putting request {req} into queue, {request_queue.qsize()}')
            await request_queue.put(req)
    
        for _ in range(MAX_WORKERS):
            await request_queue.put(None)  # Tell tasks to terminate:
    
    async def get_parser_results():
        """This is involed if and only if all requests have completed normally."""
        print('Getting results from the parser')
        return 0
    
    
    async def request_consumer(request_queue):
        global error_count
    
        print('request consumer started')
    
        while True:
            request_info = await request_queue.get()
            if request_info is None:
                break
    
            await asyncio.sleep(0.5)  # Emulate work
            if request_info > HEALTHY_REQUESTS:
                raise RuntimeError(f'Breaking thing in make requests with request={request_info}')
    
    async def main():
        request_queue = asyncio.Queue(maxsize=QUEUE_SIZE)  # starting tasks
        print('Initializing consumers')
        tasks = [asyncio.create_task(request_consumer(request_queue)) for _ in range(MAX_WORKERS)]
    
        requests = list(range(REQUESTS))
        producer_task = asyncio.create_task(task_producer(request_queue, requests))
        tasks.append(producer_task)
    
        # If no exception, then this call will only return when all tasks have completed
        # successfully and pending will be empty:
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
        had_exception = False
        for task in done:
            try:
                result = task.result()
            except Exception:
                had_exception = True
                print(task)
                break
    
        # Now cancel any remaining tasks:
        if pending:
            for task in pending:
                task.cancel()
    
            # wait for all canceled tasks to finish
            await asyncio.wait(pending)
    
        if not had_exception:
            # All requests have completed normally, so
            # we can now get the parser results
            return await get_parser_results()
    
    if __name__ == '__main__':
        producer_result = asyncio.run(main())
        print(f'Output of producer: {producer_result}')
    

    Prints:

    Initializing consumers
    request consumer started
    request consumer started
    request consumer started
    Starting task producer with: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
    Putting request 0 into queue, 0
    Putting request 1 into queue, 1
    Putting request 2 into queue, 2
    Putting request 3 into queue, 3
    Putting request 4 into queue, 4
    Putting request 5 into queue, 5
    Putting request 6 into queue, 6
    Putting request 7 into queue, 7
    Putting request 8 into queue, 8
    Putting request 9 into queue, 9
    Putting request 10 into queue, 10
    Putting request 11 into queue, 11
    Putting request 12 into queue, 12
    Putting request 13 into queue, 13
    Putting request 14 into queue, 14
    Putting request 15 into queue, 15
    Putting request 16 into queue, 16
    Putting request 17 into queue, 17
    Putting request 18 into queue, 18
    Putting request 19 into queue, 19
    Putting request 20 into queue, 17
    Putting request 21 into queue, 18
    Putting request 22 into queue, 19
    Putting request 23 into queue, 17
    Putting request 24 into queue, 18
    Putting request 25 into queue, 19
    Putting request 26 into queue, 17
    Putting request 27 into queue, 18
    Putting request 28 into queue, 19
    Putting request 29 into queue, 17
    Exception in task <Task finished name='Task-4' coro=<request_consumer() done, defined at C:\Booboo\test.py:27> exception=RuntimeError('Breaking thing in make requests with request=11')>
    Output of producer: None