I've a controller dag, which calls an api every 5 minutes and then trigger corresponding dags.
Its working, triggering the corresponding dags, But the controller dag completes only after all the triggered dags are completed. I tried setting wait_for_completion=False, still its waiting for the triggered dags.
with DAG(
dag_id="controller_dag_per",
start_date=pendulum.datetime(2024, 4, 22, tz="UTC"),
schedule="0/5 * * * *",
catchup=False,
is_paused_upon_creation=False,
max_active_runs=100,
max_active_tasks=50,
) as dag:
results_list = // calling an external api fetch 100 records
for index, item in enumerate(results_list):
task_id, dag_id = None, None
if index in range(len(results_list)):
dag_id = f"test_dag_{item['dag_id']}_dag"
task_id = f"{item['dag_id']}_{item['my_var']}_{index}"
try:
TriggerDagRunOperator(task_id=f"{task_id}", trigger_dag_id=f"{dag_id}",
reset_dag_run=True, wait_for_completion=False, conf={ } )
except Exception as e:
pass
Another issue with the code is its in top level. So when airflow parses the dag files, the code put directly under the dag got executed. So I moved the code inside a @task. But then the tasks is working but the dags are not triggered.
with DAG(...)
@task()
def task_triggger_dags():
for ...
TriggerDagRunOperator(...)
task_triggger_dags()
How to make the controller dag completes immediately just after triggering the dags( its how its supposed to work by default), is there any way trigger dags from a task.
Never run custom code in the top level of an airflow dag file
the dag file is not for you , the dag file is for airflow
you need to use dynamic task mapping to fetch at run time the work to do
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id="controller_dag_per",
start_date=pendulum.datetime(2024, 4, 22, tz="UTC"),
schedule="0/5 * * * *",
catchup=False,
is_paused_upon_creation=False,
max_active_runs=100,
max_active_tasks=50,
) as dag:
def build_triggers():
documents_list = "??"
results_list = "??"
all_triggers = []
for index, item in enumerate(results_list):
if index in range(len(documents_list)):
dag_id = f"test_dag_{item['dag_id']}_dag"
trigger_run_id = f"{item['dag_id']}_{item['my_var']}_{index}"
tmp = {
"trigger_run_id": trigger_run_id,
"trigger_dag_id": dag_id,
"conf": {
}
}
all_triggers.append(tmp)
return all_triggers
build_triggers_task = PythonOperator(
task_id="build_triggers",
python_callable=build_triggers,
do_xcom_push=True,
)
trigger_task = TriggerDagRunOperator.partial(
task_id="trigger",
reset_dag_run=True,
map_index_template="{{ task.trigger_run_id }}",
wait_for_completion=False).expand_kwargs(build_triggers_task.output)
build_triggers_task >> trigger_task