I have a question about the TriggerDagRunOperator
, specifically the wait_for_completion parameter.
Before moving to Airflow 2.2, we used this operator to trigger another DAG and a ExternalTaskSensor
to wait for its completion.
In Airflow 2.2, there is a new parameter that is called wait_for_completion
that if sets to True, will make the task complete only when the triggered DAG completed.
This is great, but I was wondering about wether the worker will be released between pokes or not. I know that the ExternalTaskSensor
used to have a parameter reschedule
that you can use for pokes larger than 1m which will release the worker slot between pokes - but I don’t see it in the documentation anymore.
My question is if the wait_for_completion
parameter causes the operator to release the worker between pokes or not? From looking at the code I don’t think that is the case, so I just want to verify.
If it isn’t releasing the worker and the triggered DAG is bound to take more than 1m to finish, what should be the best approach here?
We are using MWAA Airflow 2.2 so I guess deferred operators are not an option (if it is a solution in this case)
For Apache-Airflow>=2.6.0:
Assuming you have Triggerer process running you can use Deferrable Operators. Thus the operator will defer (no occupy worker slot) when needed - according to the logic of the specific operator. To set it up simply use:
TriggerDagRunOperator(..., deferrable=True)
For Apache-Airflow<2.6.0:
When using wait_for_completion=True
in TriggerDagRunOperator
the worker will not be released as long as the operator is running. You can see that in the operator implementation. The operator use time.sleep(self.poke_interval)
As you pointed there are two ways to achieve the goal of verifying the triggered dag completed:
TriggerDagRunOperator
followed by ExternalTaskSensor
TriggerDagRunOperator
with wait_for_completion=True
However other than resources issue which you mentioned the two options are not really equivalent.
In option 1 if the triggered DAG fails then the ExternalTaskSensor
will fail.
In option 2 consider:
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
my_op = TriggerDagRunOperator (
task_id='task',
trigger_dag_id="dag_b",
...,
wait_for_completion=True,
retries=2
)
if the dag_b
fails then TriggerDagRunOperator
will retry which will invoke another DagRun of dag_b
.
Both options are valid. You need to decide which behavior suitable for your use case.