pythonpython-asyncio

How to keep track of ongoing tasks and prevent duplicate tasks?


I have a python script that receives incoming messages from web and process the data:

async with asyncio.TaskGroup() as task_group:
   processor_task = task_group.create_task(processor.start(message), name=f"process_message_{message.sender_id}_task"

I added naming for tasks with f-string interpolation so in the debugging i know which task is doing what and for who.

Each iteration of message processing may take up to an hour.

I want to only process one message from a sender at a time, however, senders sometimes send multiple messages.

How can i drop further messages from the same sender_id until there is no task running for them?

How can i check all running tasks for a same name before creating a task in TaskGroup?

I Tried reading the documentation of TaskGroup but i didn't found anything related.


Solution

  • you can keep track of the working tasks like this

    background_tasks = set()
    
    async with asyncio.TaskGroup() as task_group:
        if sender_id not in background_tasks: # check if the sender has a task running
            processor_task = task_group.create_task(processor.start(message), name=f"process_message_{message.sender_id}_task")
            background_tasks.add(sender_id) # add to background tasks
            processor_task.add_done_callback(lambda x: background_tasks.discard(sender_id)) # remove from background tasks after the task is finished
        else:
            print(f'user {sender_id} already has a task running')