airflowairflow-2.x

How to Write a DAG with Multiple Similar Tasks


I'm trying to move data from 50 tables in Postgres to BigQuery via Airflow. Each table follows the same 4 operations, just on different data:

get_latest_timestamp >> copy_data_to_bigquery >> verify_bigquery_data >> delete_postgres_data

What's the cleanest way to repeat these operations for 50 tables?

Some things I've considered:

get_latest_timestamp_table1 >> copy_data_to_bigquery_table1 >> verify_bigquery_data_table1 >> delete_postgres_data_table1
get_latest_timestamp_table2 >> copy_data_to_bigquery_table2 >> verify_bigquery_data_table2 >> delete_postgres_data_table2
...
for table in table_names:
    get_latest_timestamp = {PythonOperator with tablename as an input}
    ...
    get_latest_timestamp >> copy_data_to_bigquery >> verify_bigquery_data >> delete_postgres_data

Any other ideas? I'm pretty new to Airflow, so not sure what the best practices are for repeating similar operations.

I tried copy/pasting each task (50*4=200 tasks) in a single DAG. It works, but is ugly.


Solution

  • to avoid code replication you could use TaskGroups. This is very well described here

    for table in table_names:
    with TaskGroup(group_id='process_tables') as process_tables:
        get_latest_timestamp = EmptyOperator(task_id=f'{table}_timestamp')
        copy_data_to_bigquery = EmptyOperator(task_id=f'{table}_to_bq')
        .....
        get_latest_timestamp >> copy_data_to_bigquery
    

    You can fetch xcoms by providing also the task group like so: '''

    process_tables.copy_data_to_bigquery

    Combining task group with other task would look like this

    start >> process_tables >> end