airflow-2.x

TriggerDagRunOperator from airflow tasks


I've a controller dag, which calls an api every 5 minutes and then trigger corresponding dags.

Its working, triggering the corresponding dags, But the controller dag completes only after all the triggered dags are completed. I tried setting wait_for_completion=False, still its waiting for the triggered dags.

    with DAG(
        dag_id="controller_dag_per",
        start_date=pendulum.datetime(2024, 4, 22, tz="UTC"),
        schedule="0/5 * * * *",
        catchup=False,
        is_paused_upon_creation=False,
        max_active_runs=100,
        max_active_tasks=50,
    ) as dag:
        results_list = // calling an external api fetch 100 records 
        for index, item in enumerate(results_list):
            task_id, dag_id = None, None
            if index in range(len(results_list)):
                dag_id = f"test_dag_{item['dag_id']}_dag"
                task_id = f"{item['dag_id']}_{item['my_var']}_{index}"
            try:
                TriggerDagRunOperator(task_id=f"{task_id}", trigger_dag_id=f"{dag_id}",
                    reset_dag_run=True, wait_for_completion=False, conf={ } )
            except Exception as e:
                pass

Another issue with the code is its in top level. So when airflow parses the dag files, the code put directly under the dag got executed. So I moved the code inside a @task. But then the tasks is working but the dags are not triggered.

    with DAG(...)
        @task()
        def task_triggger_dags():
            for ...
                TriggerDagRunOperator(...)
         
        task_triggger_dags()

How to make the controller dag completes immediately just after triggering the dags( its how its supposed to work by default), is there any way trigger dags from a task.


Solution

  • Never run custom code in the top level of an airflow dag file

    the dag file is not for you , the dag file is for airflow

    you need to use dynamic task mapping to fetch at run time the work to do

    
    import pendulum
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    
    with DAG(
            dag_id="controller_dag_per",
            start_date=pendulum.datetime(2024, 4, 22, tz="UTC"),
            schedule="0/5 * * * *",
            catchup=False,
            is_paused_upon_creation=False,
            max_active_runs=100,
            max_active_tasks=50,
    ) as dag:
        def build_triggers():
            documents_list = "??"
            results_list = "??"
    
            all_triggers = []
            for index, item in enumerate(results_list):
                if index in range(len(documents_list)):
                    dag_id = f"test_dag_{item['dag_id']}_dag"
                    trigger_run_id = f"{item['dag_id']}_{item['my_var']}_{index}"
                    tmp = {
                        "trigger_run_id": trigger_run_id,
                        "trigger_dag_id": dag_id,
                        "conf": {
                        }
                    }
                    all_triggers.append(tmp)
    
            return all_triggers
    
    
        build_triggers_task = PythonOperator(
            task_id="build_triggers",
            python_callable=build_triggers,
            do_xcom_push=True,
        )
    
        trigger_task = TriggerDagRunOperator.partial(
            task_id="trigger",
            reset_dag_run=True,
            map_index_template="{{ task.trigger_run_id }}",
            wait_for_completion=False).expand_kwargs(build_triggers_task.output)
    
        build_triggers_task >> trigger_task