pythonairflow

How to change order and number of tasks to be executed in airflow


I want to create a dag workflow with 5 different branches as follows: enter image description here

The base dag is this:

from datetime import datetime, timedelta, date
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import DagRun
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.dummy import DummyOperator

def get_path(**kwargs):
    params = kwargs.get('params',{})
    if params.get('path') == '1':
        return 'task_2_a'
    elif params.get('path') == '2':
        return 'task_2_b'
    elif params.get('path') == '3':
        return 'task_2_c'
    elif params.get('path') == '4':
        return ['task_2_a','task_2_b']
    else:
        return ['task_2_a','task_2_b', 'task_2_c']


with DAG(
        'test',
        description='test',
        tags=["test"],
        schedule_interval=None,
        start_date=datetime(2025, 7, 1),
        default_args={
            'retries': 0,
            'retry_delay': timedelta(minutes=1),
            'conn_id': 'sgk_gp'
        },
        params={
            'name':'',
            'path':''
        }
) as dag:

    task_1 = SQLExecuteQueryOperator(
        task_id='task_1',
        sql=f"""
            drop table if exists {{{{dag_run.conf.name}}}};
            create table {{{{dag_run.conf.name}}}} (
              some_text character varying
            )
        """
    )

    branch_1 = BranchPythonOperator(
        task_id='branch_1',
        python_callable=get_path,
        provide_context=True,
        do_xcom_push=False
    )

    task_2_a = SQLExecuteQueryOperator(
        task_id='task_2_a',
        sql=f"""
            insert into {{{{dag_run.conf.name}}}} (some_text)
            select some_text from (select 'aaa' as some_text) as tab
        """
    )

    task_2_b = SQLExecuteQueryOperator(
        task_id='task_2_b',
        sql=f"""
            insert into {{{{dag_run.conf.name}}}} (some_text)
            select some_text from (select 'bbb' as some_text) as tab
        """
    )

    task_2_c = SQLExecuteQueryOperator(
        task_id='task_2_c',
        sql=f"""
            insert into {{{{dag_run.conf.name}}}} (some_text)
            select some_text from (select 'ccc' as some_text) as tab
        """
    )

    task_3 = SQLExecuteQueryOperator(
        task_id='task_3',
        sql=f"""
            insert into {{{{dag_run.conf.name}}}} (some_text)
            select some_text from (select '333' as some_text) as tab
        """
    )

    complete = DummyOperator(task_id="complete", trigger_rule=TriggerRule.NONE_FAILED)

I tried setting the workflow like this initially:

task_1 >> branch_1 >> [task_2_a >> task_2_b >> task_2_c] >> task_3 >> complete

But this would lead to Task_2 being executed in parallel if branches 4 or 5 were chosen and I need them to run strictly in order.

Then I tried doing this, but this did not yield the desired reults either.

    task_1 >> branch_1 >> task_2_a >> task_3 >> complete
    task_1 >> branch_1 >> task_2_b >> task_3 >> complete
    task_1 >> branch_1 >> task_2_c >> task_3 >> complete
    task_1 >> branch_1 >> task_2_a >> task_2_b >> task_3 >> complete
    task_1 >> branch_1 >> task_2_a >> task_2_b >> task_2_c >> task_3 >> complete

I had an idea to implement multiple branch operators but this would to a very confusing structure in my opinion. Is there a simple way to achieve this?


Solution

  • If you want to achieve the exact graph you posted you have to ignore for duplicate code smells in your code.

    Here the working example of your DAG:

    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.python import BranchPythonOperator
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    from airflow.utils.trigger_rule import TriggerRule
    from airflow.operators.dummy import DummyOperator
    
    
    def get_path(**kwargs):
        params = kwargs.get('params',{})
        if params.get('path') == '1':
            return 'task_2_a'
        elif params.get('path') == '2':
            return 'task_2_b'
        elif params.get('path') == '3':
            return 'task_2_c'
        elif params.get('path') == '4':
            return ['task_2_a', 'task_2_b']
        else:
            return ['task_2_a', 'task_2_b', 'task_2_c']
    
    
    with DAG(
            'test',
            description='test',
            tags=["test"],
            schedule_interval=None,
            start_date=datetime(2025, 7, 1),
            default_args={
                'retries': 0,
                'retry_delay': timedelta(minutes=1),
                'conn_id': 'sgk_gp'
            },
            params={
                'name':'',
                'path':''
            }
    ) as dag:
    
        task_1 = SQLExecuteQueryOperator(
            task_id='task_1',
            sql=f"""
                drop table if exists {{{{dag_run.conf.name}}}};
                create table {{{{dag_run.conf.name}}}} (
                  some_text character varying
                )
            """
        )
    
        branch_1 = BranchPythonOperator(
            task_id='branch_1',
            python_callable=get_path,
            provide_context=True,
            do_xcom_push=False
        )
    
        task_2_a = SQLExecuteQueryOperator(
            task_id='task_2_a',
            sql=f"""
                insert into {{{{dag_run.conf.name}}}} (some_text)
                select some_text from (select 'aaa' as some_text) as tab
            """
        )
    
        task_2_b = SQLExecuteQueryOperator(
            task_id='task_2_b',
            sql=f"""
                insert into {{{{dag_run.conf.name}}}} (some_text)
                select some_text from (select 'bbb' as some_text) as tab
            """
        )
    
        task_2_b_1 = SQLExecuteQueryOperator(
            task_id='task_2_b_1',
            sql=f"""
                    insert into {{{{dag_run.conf.name}}}} (some_text)
                    select some_text from (select 'bbb' as some_text) as tab
                """
        )
    
        task_2_a_1 = SQLExecuteQueryOperator(
            task_id='task_2_a_1',
            sql=f"""
                    insert into {{{{dag_run.conf.name}}}} (some_text)
                    select some_text from (select 'aaa' as some_text) as tab
                """
        )
    
        task_2_a_2 = SQLExecuteQueryOperator(
            task_id='task_2_a_2',
            sql=f"""
                        insert into {{{{dag_run.conf.name}}}} (some_text)
                        select some_text from (select 'aaa' as some_text) as tab
                    """
        )
    
        task_2_c = SQLExecuteQueryOperator(
            task_id='task_2_c',
            sql=f"""
                insert into {{{{dag_run.conf.name}}}} (some_text)
                select some_text from (select 'ccc' as some_text) as tab
            """
        )
    
        task_2_c_1 = SQLExecuteQueryOperator(
            task_id='task_2_c_1',
            sql=f"""
                    insert into {{{{dag_run.conf.name}}}} (some_text)
                    select some_text from (select 'ccc' as some_text) as tab
                """
        )
    
        task_2_c_2 = SQLExecuteQueryOperator(
            task_id='task_2_c_2',
            sql=f"""
                    insert into {{{{dag_run.conf.name}}}} (some_text)
                    select some_text from (select 'ccc' as some_text) as tab
                """
        )
    
        task_3 = SQLExecuteQueryOperator(
            task_id='task_3',
            sql=f"""
                insert into {{{{dag_run.conf.name}}}} (some_text)
                select some_text from (select '333' as some_text) as tab
            """
        )
    
        complete = DummyOperator(task_id="complete", trigger_rule=TriggerRule.NONE_FAILED)
    
        task_1 >> branch_1
        branch_1 >> [task_2_a, task_2_b, task_2_c, task_2_a_1, task_2_a_2]
        [task_2_a_1 >> task_2_c_1] >> task_3
        [task_2_a_2 >> task_2_b_1 >> task_2_c_2] >> task_3
        [task_2_a, task_2_b, task_2_c] >> task_3
        task_3 >> complete