dependenciesairflowtaskrepeatdirected-acyclic-graphs

Configure a task to repeat on every change to upstream tasks


I am designing a DAG which processes several data sources and outputs a report. The DAG, as a whole, is triggered daily. Schematically:

src1 --┐
src2 --┤
...  --┼---> report
srcN --┘

By default, a task waits for all upstream tasks to finish successfully. In my case, the source tasks are triggered at different times and have non-uniform runtimes. Yet, I would like to have a report, outside Airflow, that reflects the up-to-date state of the system. For this reason, I want to trigger the report task multiple time - initially when the DAG is initialized (to create a placeholder report), and later whenever each of the src tasks is done.

For the purposes of the question, assume all the report task does is pull the status of all upstream tasks and store them in a file, in the form of src1: {N/A, success, failure} entries.

To achieve this outside of Airflow, I'd likely set up a pub-sub channel between the source tasks and the report and have each task notify when it's done (regardless of outcome), and the reporting service, which is subscribed to these notifications, will run whenever notified.

One solution I had in mind was to create a periodically running task, in a separate DAG, that looks for some output files generated by each of the src tasks individually, assigning N/A when the files are missing. Thus, a UX similar to an auto-updating report is achieved. Then, the report task is effectively disconnected from this DAG and it becomes a collection of unconnected task{group}s.

Several questions:

  1. Is the workflow I have in mind suitable for Airflow?
  2. Assuming this is doable, how should I configure the DAG itself and/or the report task to achieve what I want? I considered the various trigger rules for report, but these seem to only trigger report once.
  3. To create a placeholder report, I thought to create a dummy task which finishes immediately and causes report to run using whichever mechanism, and report would ignore the dummy task in the report. Is there a more straightforward way to achieve this?

Solution

  • One way to achieve this would be to use at least 2 DAGs:

    Some code sample would look like this:

    with DAG(dag_id="producer", ...):
        for src in [src1, src2, src3]:
            producer = BashOperator(task_id="producer_"+src.name, ...)
            refresh = TriggerDagRunOperator(
                task_id="refresh_from_"+src.name,
                trigger_dag_id="consumer",
                trigger_run_id="",
                reset_dag_run=True,
                 ...)
            producer >> refresh
    
    with DAG(dag_id="consumer", ...):
        BashOperator(task_id="build_or_refresh_report", ...)
        ...
    

    As a good practice you should provide the appropriate trigger_run_id TriggerDagRunOperator in order to reuse the same DAG_run object, otherwise you could trigger a "manual" dag_run.

    It would respect idem-potency principles and would lower the amount of DAG runs stored in Database. In general, DAGs should have a define schedule to operate a specific period of time.

    If you use the same trigger_run_id and want your Report to be executed multiple times you must use reset_dag_run=True.

    It would be easier to guess the trigger_run id if your Source DAG(s) use the same schedule as the Report DAG but it's not mandatory.

    As stated by RNHTTR's answer the normal way to achieve your goal should be the Dataset feature but it's a relatively recent thing and possibilities are limited at the moment.

    Useful sources: