pythonpython-asynciofastapi

FastAPI with asyncio | Running long running background tasks


I'm running a heavy async task in FastAPI and need to return a response to the user while continuing the task in the background with asyncio. When I await the task, it works perfectly, but when I add it to the task loop, it randomly stops executing.

Any insights on best practices for handling this?

End to end function with abstractions below

async def from_template_to_conv(
    transaction_id: str,
    db,
):
    print("Getting templates")
    transaction = await fetch_transaction_with_workstreams(db, transaction_id)
    if len(transaction.workstreams) > 0:
        print("Transaction already has workstreams. Aborting.")
        return

    # If transaction is not found, raise a 404 error
    print("Creating workstreams")
    response = await process_transaction_based_on_template(db, transaction=transaction)
    print("Workstream created successfully.")

    questions = await fetch_transaction_questions(db, transaction_id)
    question_dics = [
        {
            "id": q.id,
            "content": q.content,
        }
        for q in questions
    ]
    executor = ThreadPoolExecutor(max_workers=1)

    # Define the run_heavy_task function to process all questions
    async def run_heavy_task(transaction_id, questions, db, executor):

        for question in questions:
            transaction = await get_transaction(db, transaction_id)
            print(f"Fetched transaction {transaction_id}.")

            users = await get_users_by_organization(db, transaction.organization_id)
            print(f"Fetched users for transaction {transaction_id}.")
            user_ids = [u.id for u in users]
            print(user_ids)
            print(f"Automating questionnaire for question {question['id']}.")
            conversation = await create_automation_conversation(
                db, user_ids, "deals", transaction.organization_id, transaction
            )
            print(f"Created conversation for transaction {transaction_id}.")

            response = await process_message_conversation(
                question["id"],
                conversation,
                conversation.id,
                question["content"],
                user_ids,
                db,
            )
            print(f"Processed message for question {question['id']}.")

            final_message = None
            async for message in response.body_iterator:
                final_message = message

            if final_message:
                print(f"Final message received for question {question['id']}: {schema.Message.parse_raw(final_message)}")
            else:
                raise HTTPException(status_code=500, detail="Internal server error")

    # Create a single task to process all questions
    task = asyncio.create_task(run_heavy_task(transaction_id, question_dics, db, executor))
    
    # await task  # Ensure the task is awaited and executed

    return response

Solution

  • The answer is here, you didn't make a strong reference to a task, so it's been cleared with garbage collector eventually (in a random time).

    You can use global task map, as suggested in linked question, or better fastapi.BackgroundTasks to avoid globals and it will ensure task completion.

    If you want to eventually run eternal tasks, without finalization, you'd better switch to a pub-sub model.