I have an Airflow pipeline that starts with a FileSensor that may perform a number of retries (which makes sense because the producing process sometimes takes longer, and sometimes simply fails).
However when I restart the pipeline, as it runs in catchup mode, the retries in the file_sensor become spurious: if the file isn't there for a previous day, it wont materialize anymore.
Therefore my question: is it possible to make the behavior of a DAG-run contingent on whether that is currently running in a catch up or in a regularly scheduled run?
My apologies if this is a duplicated question: it seems a rather basic problem, but I couldn't find previous questions or documentation.
The solution is rather simple.
Set a LatestOnlyOperator
upstream from the FileSensor
Set an operator of any type you may need downstream from the FileSensor
with its trigger rule set to TriggerRule.ALL_DONE
.
Both skipped
and success
states count as "done" states, while an error
state doesn't. Hence, in a "non-catch-up" run the FileSensor
will need to succeed to give way to the downstream task, while in a catch-up run, the downstream task will start right away
after skipping the FileSensor
.