pythonpython-3.xairflowdirected-acyclic-graphsairflow-scheduler

How to Trigger a DAG on the success of a another DAG in Airflow using Python?


I have a python DAG Parent Job and DAG Child Job. The tasks in the Child Job should be triggered on the successful completion of the Parent Job tasks which are run daily. How can add external job trigger ?

MY CODE

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')

execute_notebook = PostgresOperator(
  task_id='data_sql',
  postgres_conn_id='REDSHIFT_CONN',
  sql="SELECT * FROM athena_rs.shipments limit 5",
  dag=dag
)

Solution

  • Answer is in this thread already. Below is demo code:

    Parent dag:

    from datetime import datetime
    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2020, 4, 29),
    }
    
    dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')
    
    leave_work = DummyOperator(
        task_id='leave_work',
        dag=dag,
    )
    cook_dinner = DummyOperator(
        task_id='cook_dinner',
        dag=dag,
    )
    
    leave_work >> cook_dinner
    

    Child dag:

    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.sensors import ExternalTaskSensor
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2020, 4, 29),
    }
    
    dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')
    
    # Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
    # when cook_dinner is finished, Child_dag will be triggered
    wait_for_dinner = ExternalTaskSensor(
        task_id='wait_for_dinner',
        external_dag_id='Parent_dag',
        external_task_id='cook_dinner',
        start_date=datetime(2020, 4, 29),
        execution_delta=timedelta(hours=1),
        timeout=3600,
    )
    
    have_dinner = DummyOperator(
        task_id='have_dinner',
        dag=dag,
    )
    play_with_food = DummyOperator(
        task_id='play_with_food',
        dag=dag,
    )
    
    wait_for_dinner >> have_dinner
    wait_for_dinner >> play_with_food
    

    Images:

    Dags

    Dags

    Parent_dag

    Parent_dag

    Child_dag

    Child_dag