I got 2 dags in same dags folder:
I can run dag_add_client_loyalty with this code, but I get error:
[2024-07-29, 14:34:15 MSK] {task_command.py:423} INFO - Running <TaskInstance: dag_update_database.trigger_add_client_loyalty_dag manual__2024-07-29T11:25:59.801574+00:00 [running]> on host 4e2cf1ad7047
[2024-07-29, 14:34:15 MSK] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='d-chernovol' AIRFLOW_CTX_DAG_ID='dag_update_database' AIRFLOW_CTX_TASK_ID='trigger_add_client_loyalty_dag' AIRFLOW_CTX_EXECUTION_DATE='2024-07-29T11:25:59.801574+00:00' AIRFLOW_CTX_TRY_NUMBER='6' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-07-29T11:25:59.801574+00:00'
[2024-07-29, 14:34:15 MSK] {taskinstance.py:2728} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 444, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 414, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 241, in execute
return_value = super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 200, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/python.py", line 217, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/dags/dag_update_database.py", line 794, in trigger_add_client_loyalty_dag
trigger_dag_add_client_loyalty_task.execute(context={})
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/operators/trigger_dagrun.py", line 192, in execute
ti = context["task_instance"]
~~~~~~~^^^^^^^^^^^^^^^^^
KeyError: 'task_instance'
Then I get error this task runs 4 additional times. So I run my second task many times. It mean I need edit context to avoid error.
My first dag:
import datetime
from airflow.decorators import dag, task
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from dag_add_client_loyalty import dag_add_client_loyalty
default_args = {
'owner': 'd-chernovol',
'depends_on_past': False,
'retries': 5,
'retry_delay': datetime.timedelta(minutes=1),
'start_date': datetime.datetime(2024, 6, 30)
}
schedule_interval = '*/20 * * * *'
@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False, concurrency=4)
def dag_update_database():
@task
def get_salons(api, tries: int):
...
return result_from_salons_api
@task
def get_clients(api, tries: int, result_from_salons_api: list):
...
return clients
@task
def trigger_add_client_loyalty_dag():
trigger_dag_add_client_loyalty_task = TriggerDagRunOperator(
task_id='run_dag_add_client_loyalty',
trigger_dag_id='dag_add_client_loyalty',
dag=dag_update_database
)
trigger_dag_add_client_loyalty_task.execute(context={})
get_salon_task = get_salons(api, 10)
get_clients_task = get_clients(api, 10, get_salon_task)
run_add_client_loyalty_dag_task = trigger_add_client_loyalty_dag()
get_salon_task.set_downstream(get_clients_task)
get_clients_task.set_downstream(run_add_client_loyalty_dag_task)
dag_update_database = dag_update_database()
Second dag code:
from airflow.decorators import dag, task
default_args = {
'owner': 'd-chernovol',
'depends_on_past': False,
'retries': 5,
'retry_delay': datetime.timedelta(minutes=1),
'start_date': datetime.datetime(2024, 6, 30)
}
@dag(default_args=default_args, schedule_interval=None, catchup=False, concurrency=4)
def dag_add_client_loyalty():
@task
def get_salons_list():
return result
@task
def get_records_from_api():
return result
get_salon_task = get_salons_list(api)
get_records_from_api_task = get_records_from_api(api, get_salon_task)
get_salon_task.set_downstream(get_records_from_api_task)
dag_add_client_loyalty = dag_add_client_loyalty()
I finally solve my problem.
The basic problem of my first code - I didn't put context for trigger.execute. In this case trigger run next dag, but I get error:
context["task_instance"]
All I need to do is add kwarks for def trigger_add_client_loyalty_dag(**kwargs).
Context will be put to kwarks automatically.
from datetime import datetime, timedelta, timezone
from airflow.decorators import dag, task
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from dag_add_client_loyalty import dag_add_client_loyalty
default_args = {
'owner': 'd-chernovol',
'depends_on_past': False,
'retries': 5,
'retry_delay': timedelta(minutes=1),
'start_date': datetime(2024, 6, 30)
}
schedule_interval = '*/20 * * * *'
@dag(default_args=default_args, schedule_interval=schedule_interval, catchup=False, concurrency=4)
def dag_update_database():
@task
def get_salons(api, tries: int):
...
return result_from_salons_api
@task
def get_clients(api, tries: int, result_from_salons_api: list):
...
return clients
@task
def trigger_add_client_loyalty_dag(**kwargs): # I add kwargs, kwargs contain context (airflow put context into kwarks by default)
execution_date = timezone.make_aware(datetime.now(), timezone=timezone.utc) # add date
context = kwargs # context to run .execute
trigger_dag_add_client_loyalty_task = TriggerDagRunOperator(
task_id='run_dag_add_client_loyalty'
, trigger_dag_id='dag_add_client_loyalty'
, execution_date=execution_date # add date
, wait_for_completion=False # I don't want get up_for_retry in this dag, I'll check log for dag_add_client_loyalty anyway
, dag=dag_update_database
)
trigger_dag_add_client_loyalty_task.execute(context=context) # add context
get_salon_task = get_salons(api, 10)
get_clients_task = get_clients(api, 10, get_salon_task)
run_add_client_loyalty_dag_task = trigger_add_client_loyalty_dag()
get_salon_task.set_downstream(get_clients_task)
get_clients_task.set_downstream(run_add_client_loyalty_dag_task)
dag_update_database = dag_update_database()