taskairflowbatching

airflow tasks in specific batches


I want to run a set of tasks like this:

a >> [b,c,d] >> [e,f,g] >> [h,i,j,k,l,m]

First run task a, when that is done, run b,c,d in parallel, then when the last of b,c,d is done. start running e,f,g in parallel etc.

But i'm getting an error with unsupported operand type(s) for >>: 'list' and 'list'

what is the correct syntax for what I want to do?


Solution

  • The error you are getting is related to the fact that dependencies between lists using bitwise operator are not supported, [task_a, task_b] >> [task_c, task_d] won't work.

    IMHO the easiest and cleaner way to achieve what you are looking for (there are others) is to use TaskGroup and set depenencies between them, like this:

    Graph view:

    Graph view

    from time import sleep
    from airflow import DAG
    from airflow.utils.dates import days_ago
    from airflow.operators.python import PythonOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.utils.task_group import TaskGroup
    
    default_args = {
        'start_date':  days_ago(1)
    }
    
    
    def _execute_task(**kwargs):
        print(f"Task_id: {kwargs['ti'].task_id}")
        sleep(10)
    
    
    def _create_python_task(name):
        return PythonOperator(
            task_id=f'task_{name}',
            python_callable=_execute_task)
    
    
    with DAG('parallel_tasks_example', schedule_interval='@once',
             default_args=default_args, catchup=False) as dag:
    
        task_a = DummyOperator(task_id='task_a')
    
        with TaskGroup('first_group') as first_group:
    
            for name in list('bcd'):
                task = _create_python_task(name)
    
        with TaskGroup('second_group') as second_group:
    
            for name in list('efg'):
                task = _create_python_task(name)
    
        with TaskGroup('third_group') as third_group:
    
            for name in list('hijk'):
                task = _create_python_task(name)
    
    task_a >> first_group >> second_group >> third_group
    

    From TaskGroup class definition:

    A collection of tasks. When set_downstream() or set_upstream() are called on the TaskGroup, it is applied across all tasks within the group if necessary.

    You can find an official example about here .