I want to create a dag workflow with 5 different branches as follows:

The base dag is this:
from datetime import datetime, timedelta, date
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import DagRun
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.dummy import DummyOperator
def get_path(**kwargs):
params = kwargs.get('params',{})
if params.get('path') == '1':
return 'task_2_a'
elif params.get('path') == '2':
return 'task_2_b'
elif params.get('path') == '3':
return 'task_2_c'
elif params.get('path') == '4':
return ['task_2_a','task_2_b']
else:
return ['task_2_a','task_2_b', 'task_2_c']
with DAG(
'test',
description='test',
tags=["test"],
schedule_interval=None,
start_date=datetime(2025, 7, 1),
default_args={
'retries': 0,
'retry_delay': timedelta(minutes=1),
'conn_id': 'sgk_gp'
},
params={
'name':'',
'path':''
}
) as dag:
task_1 = SQLExecuteQueryOperator(
task_id='task_1',
sql=f"""
drop table if exists {{{{dag_run.conf.name}}}};
create table {{{{dag_run.conf.name}}}} (
some_text character varying
)
"""
)
branch_1 = BranchPythonOperator(
task_id='branch_1',
python_callable=get_path,
provide_context=True,
do_xcom_push=False
)
task_2_a = SQLExecuteQueryOperator(
task_id='task_2_a',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'aaa' as some_text) as tab
"""
)
task_2_b = SQLExecuteQueryOperator(
task_id='task_2_b',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'bbb' as some_text) as tab
"""
)
task_2_c = SQLExecuteQueryOperator(
task_id='task_2_c',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'ccc' as some_text) as tab
"""
)
task_3 = SQLExecuteQueryOperator(
task_id='task_3',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select '333' as some_text) as tab
"""
)
complete = DummyOperator(task_id="complete", trigger_rule=TriggerRule.NONE_FAILED)
I tried setting the workflow like this initially:
task_1 >> branch_1 >> [task_2_a >> task_2_b >> task_2_c] >> task_3 >> complete
But this would lead to Task_2 being executed in parallel if branches 4 or 5 were chosen and I need them to run strictly in order.
Then I tried doing this, but this did not yield the desired reults either.
task_1 >> branch_1 >> task_2_a >> task_3 >> complete
task_1 >> branch_1 >> task_2_b >> task_3 >> complete
task_1 >> branch_1 >> task_2_c >> task_3 >> complete
task_1 >> branch_1 >> task_2_a >> task_2_b >> task_3 >> complete
task_1 >> branch_1 >> task_2_a >> task_2_b >> task_2_c >> task_3 >> complete
I had an idea to implement multiple branch operators but this would to a very confusing structure in my opinion. Is there a simple way to achieve this?
If you want to achieve the exact graph you posted you have to ignore for duplicate code smells in your code.
Here the working example of your DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.dummy import DummyOperator
def get_path(**kwargs):
params = kwargs.get('params',{})
if params.get('path') == '1':
return 'task_2_a'
elif params.get('path') == '2':
return 'task_2_b'
elif params.get('path') == '3':
return 'task_2_c'
elif params.get('path') == '4':
return ['task_2_a', 'task_2_b']
else:
return ['task_2_a', 'task_2_b', 'task_2_c']
with DAG(
'test',
description='test',
tags=["test"],
schedule_interval=None,
start_date=datetime(2025, 7, 1),
default_args={
'retries': 0,
'retry_delay': timedelta(minutes=1),
'conn_id': 'sgk_gp'
},
params={
'name':'',
'path':''
}
) as dag:
task_1 = SQLExecuteQueryOperator(
task_id='task_1',
sql=f"""
drop table if exists {{{{dag_run.conf.name}}}};
create table {{{{dag_run.conf.name}}}} (
some_text character varying
)
"""
)
branch_1 = BranchPythonOperator(
task_id='branch_1',
python_callable=get_path,
provide_context=True,
do_xcom_push=False
)
task_2_a = SQLExecuteQueryOperator(
task_id='task_2_a',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'aaa' as some_text) as tab
"""
)
task_2_b = SQLExecuteQueryOperator(
task_id='task_2_b',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'bbb' as some_text) as tab
"""
)
task_2_b_1 = SQLExecuteQueryOperator(
task_id='task_2_b_1',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'bbb' as some_text) as tab
"""
)
task_2_a_1 = SQLExecuteQueryOperator(
task_id='task_2_a_1',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'aaa' as some_text) as tab
"""
)
task_2_a_2 = SQLExecuteQueryOperator(
task_id='task_2_a_2',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'aaa' as some_text) as tab
"""
)
task_2_c = SQLExecuteQueryOperator(
task_id='task_2_c',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'ccc' as some_text) as tab
"""
)
task_2_c_1 = SQLExecuteQueryOperator(
task_id='task_2_c_1',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'ccc' as some_text) as tab
"""
)
task_2_c_2 = SQLExecuteQueryOperator(
task_id='task_2_c_2',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'ccc' as some_text) as tab
"""
)
task_3 = SQLExecuteQueryOperator(
task_id='task_3',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select '333' as some_text) as tab
"""
)
complete = DummyOperator(task_id="complete", trigger_rule=TriggerRule.NONE_FAILED)
task_1 >> branch_1
branch_1 >> [task_2_a, task_2_b, task_2_c, task_2_a_1, task_2_a_2]
[task_2_a_1 >> task_2_c_1] >> task_3
[task_2_a_2 >> task_2_b_1 >> task_2_c_2] >> task_3
[task_2_a, task_2_b, task_2_c] >> task_3
task_3 >> complete