airflowairflow-2.x

Limiting concurrency of parallel task execution


I want to limit the number of airflow tasks running in parallel to avoid overwhelming affected resources such as AWS S3. Here is an example of non-working code, all 20 tasks from the first group start simultaneously rather than only 10. This code is meant to run on AWS MWAA, v2.7

import time
from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup

DEFAULT_ARGS = {
    "owner": "airflow",
    "depends_on_past": False,
}

ACTIVE_PARTNERS = [{"id": n} for n in range(1, 21)]

def simulate_task_execution(sleep_time):
    print(f"Simulating task execution by sleeping for {sleep_time} seconds.")
    time.sleep(sleep_time)

with DAG(
        dag_id="parallel_lanes_with_limited_first_task",
        default_args=DEFAULT_ARGS,
        start_date=datetime(2024, 1, 1, 1, 0, 0),
        schedule_interval="0 * * * *",
        max_active_runs=1,
        tags=[],
        catchup=False,
) as dag:
    with TaskGroup("athena_section") as athena_section:
        athena_section.dag.max_active_tasks_per_dag=10 # not working
        for partner in ACTIVE_PARTNERS:
            athena_insert = PythonOperator(
                task_id=f"partner_{partner['id']}_athena_insert",
                python_callable=simulate_task_execution,
                task_concurrency=10, # not working
                op_args=[30],  # Sleep for N seconds
            )
            athena_insert

    with TaskGroup("ecs_section") as ecs_section:
        athena_section.dag.max_active_tasks_per_dag=100 # not working
        for partner in ACTIVE_PARTNERS:
            ecs_operators = PythonOperator(
                task_id=f"data_to_dynamodb_ecs_task_{partner['id']}",
                python_callable=simulate_task_execution,
                op_args=[5],  # Sleep for N seconds
            )
            ecs_operators

    athena_section >> ecs_section

I tried concurency, task_concurrency and max_active_tasks_per_dag flags.


Solution

  • Pools is what you're looking for here, see:

    You can create a pool with a certain number of "slots", say 5. Then specify that pool in the task:

    PythonOperator(
        task_id=f"partner_{partner['id']}_athena_insert",
        python_callable=simulate_task_execution,
        pool="my_pool_name",
        op_args=[30],  # Sleep for N seconds
    )
    

    This ensures at most 5 of these tasks will run in parallel.

    task_concurrency controls the maximum parallel runs of that one specific task across your Airflow instance. That means if you configure task_concurrency=10, you limit every partner_{partner['id']}_athena_insert task to at most 10 parallel runs. So if you'd have 20 partners in ACTIVE_PARTNERS, that could theoretically run 20*10=200 of such tasks in parallel.