airflow

Can I programmatically determine if an Airflow DAG was scheduled or manually triggered?


I want to create a snippet that passes the correct date based on whether the DAG was scheduled or whether it was triggered manually. The DAG runs monthly. The DAG generates a report (A SQL query) based on the data of the previous month.

If I run the DAG scheduled, I can fetch the previous month with the following jinja snippet:

execution_date.month

given that the DAG is scheduled at the end of the previous period (last month) the execution_date will correctly return the last month. However on manual runs this will return the current month (execution date will be the date of the manual trigger).

I want to write a simple macro that deals with this case. However I could not find a good way to programmatically query whether the DAG is triggered programmatically. The best I could come up with is to fetch the run_id from the database (by creating a macro that has a DB session), check wheter the run_id contains the word manual. Is there a better way to solve this problem?


Solution

  • tl;dr: You can determine this with DagRun.external_trigger.


    I noticed that in the Tree View, there's an outline around runs that are scheduled, but not manual. That's because the latter has stroke-opacity: 0; applied in CSS.

    Searching the repo for this, I found how Airflow devs detect manual runs (5 year old line, so should work in older version as well):

    .style("stroke-opacity", function(d) {return d.external_trigger ? "0": "1"})
    

    Searching for external_trigger brings us to the DagRun definition.

    So if you were using, for example, a Python callback, you can have something like this (can be defined in the DAG, or a separate file):

    def my_fun(context):
        if context.get('dag_run').external_trigger:
            print('manual run')
        else:
            print('scheduled run')
    

    and in your Operator set the parameter like:

    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        on_failure_callback=my_fun,
        dag=dag,
    )
    

    I have tested something similar and it works.

    I think you can also do something like if if {{ dag_run.external_trigger }}: - but I haven't tested this, and I believe it would only work in that DAG's file.