pythonairflow

How to add trigger for run dag into another dag (with decorators) in Airflow?


I got 2 dags in same dags folder:

  1. dag_update_database (dag_update_database.py)
  2. dag_add_client_loyalty (dag_updade_clients_loyalty.py)

I can run dag_add_client_loyalty with this code, but I get error:

[2024-07-29, 14:34:15 MSK] {task_command.py:423} INFO - Running <TaskInstance: dag_update_database.trigger_add_client_loyalty_dag manual__2024-07-29T11:25:59.801574+00:00 [running]> on host 4e2cf1ad7047
[2024-07-29, 14:34:15 MSK] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='d-chernovol' AIRFLOW_CTX_DAG_ID='dag_update_database' AIRFLOW_CTX_TASK_ID='trigger_add_client_loyalty_dag' AIRFLOW_CTX_EXECUTION_DATE='2024-07-29T11:25:59.801574+00:00' AIRFLOW_CTX_TRY_NUMBER='6' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-07-29T11:25:59.801574+00:00'
[2024-07-29, 14:34:15 MSK] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 241, in execute
    return_value = super().execute(context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 200, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 217, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/airflow/dags/dag_update_database.py", line 794, in trigger_add_client_loyalty_dag
    trigger_dag_add_client_loyalty_task.execute(context={})
  File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/trigger_dagrun.py", line 192, in execute
    ti = context["task_instance"]
         ~~~~~~~^^^^^^^^^^^^^^^^^
KeyError: 'task_instance'

Then I get error this task runs 4 additional times. So I run my second task many times. It mean I need edit context to avoid error.

My first dag:

import datetime

from airflow.decorators import dag, task
from airflow.operators.dagrun_operator import TriggerDagRunOperator

from dag_add_client_loyalty import dag_add_client_loyalty

default_args = {
    'owner': 'd-chernovol',
    'depends_on_past': False,
    'retries': 5,
    'retry_delay': datetime.timedelta(minutes=1),
    'start_date': datetime.datetime(2024, 6, 30)
}

schedule_interval = '*/20 * * * *'

@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False, concurrency=4)
def dag_update_database():
    @task
    def get_salons(api, tries: int):
        ...
        return result_from_salons_api

    @task
    def get_clients(api, tries: int, result_from_salons_api: list):
        ...
        return clients
    
@task  
def trigger_add_client_loyalty_dag():       
    trigger_dag_add_client_loyalty_task = TriggerDagRunOperator(
        task_id='run_dag_add_client_loyalty',
        trigger_dag_id='dag_add_client_loyalty',
        dag=dag_update_database
    )
    trigger_dag_add_client_loyalty_task.execute(context={})

    get_salon_task = get_salons(api, 10)
    get_clients_task = get_clients(api, 10, get_salon_task)
    run_add_client_loyalty_dag_task = trigger_add_client_loyalty_dag()


    get_salon_task.set_downstream(get_clients_task)
    get_clients_task.set_downstream(run_add_client_loyalty_dag_task)

dag_update_database = dag_update_database()

Second dag code:

from airflow.decorators import dag, task

default_args = {
    'owner': 'd-chernovol',
    'depends_on_past': False,
    'retries': 5,
    'retry_delay': datetime.timedelta(minutes=1),
    'start_date': datetime.datetime(2024, 6, 30)
}

@dag(default_args=default_args, schedule_interval=None, catchup=False, concurrency=4)
def dag_add_client_loyalty():
    @task
    def get_salons_list():
        return result
    
    @task
    def get_records_from_api():
        return result

    get_salon_task = get_salons_list(api)
    get_records_from_api_task = get_records_from_api(api, get_salon_task)

    get_salon_task.set_downstream(get_records_from_api_task)

dag_add_client_loyalty = dag_add_client_loyalty()

Solution

  • I finally solve my problem.

    The basic problem of my first code - I didn't put context for trigger.execute. In this case trigger run next dag, but I get error:

    context["task_instance"]
    

    All I need to do is add kwarks for def trigger_add_client_loyalty_dag(**kwargs).

    Context will be put to kwarks automatically.

    from datetime import datetime, timedelta, timezone
    
    from airflow.decorators import dag, task
    from airflow.operators.dagrun_operator import TriggerDagRunOperator
    
    from dag_add_client_loyalty import dag_add_client_loyalty
    
    default_args = {
        'owner': 'd-chernovol',
        'depends_on_past': False,
        'retries': 5,
        'retry_delay': timedelta(minutes=1),
        'start_date': datetime(2024, 6, 30)
    }
    
    schedule_interval = '*/20 * * * *'
    
    @dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False, concurrency=4)
    def dag_update_database():
        @task
        def get_salons(api, tries: int):
            ...
            return result_from_salons_api
    
        @task
        def get_clients(api, tries: int, result_from_salons_api: list):
            ...
            return clients
        
    @task  
    def trigger_add_client_loyalty_dag(**kwargs): # I add kwargs, kwargs contain context (airflow put context into kwarks by default)
        execution_date = timezone.make_aware(datetime.now(), timezone=timezone.utc) # add date
        context = kwargs # context to run .execute
        trigger_dag_add_client_loyalty_task = TriggerDagRunOperator(
            task_id='run_dag_add_client_loyalty'
            , trigger_dag_id='dag_add_client_loyalty'
            , execution_date=execution_date # add date
            , wait_for_completion=False # I don't want get up_for_retry in this dag, I'll check log for dag_add_client_loyalty anyway
            , dag=dag_update_database
        )
        trigger_dag_add_client_loyalty_task.execute(context=context) # add context
    
        get_salon_task = get_salons(api, 10)
        get_clients_task = get_clients(api, 10, get_salon_task)
        run_add_client_loyalty_dag_task = trigger_add_client_loyalty_dag()
    
    
        get_salon_task.set_downstream(get_clients_task)
        get_clients_task.set_downstream(run_add_client_loyalty_dag_task)
    
    dag_update_database = dag_update_database()