airflowairflow-2.x

Airflow - Dynamic Tasks and Downstream Dependencies


I inherited the following dag which is running on AWS MWAA v2.2.2

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
}

snowflake_conn_id='snowflake'

sources = [
    {'name': 'test_1', 'path': 'test1/path/'},
    {'name': 'test_2', 'path': 'test2/path'}
 ]

# define the DAG
with DAG(
    'test_dag',
    default_args=default_args,
    description='test_dag test description.',
    schedule_interval=None,
    max_active_runs=1
) as dag:

    t0 = DummyOperator(task_id = 'start')

    for source in sources:
        create_table_sql = (
            f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
            )

        external_tables_from_s3 = SnowflakeOperator(
            task_id=f"create_external_table_for_{source['name']}",
            dag=dag,
            sql=create_table_sql,
            snowflake_conn_id=snowflake_conn_id
        )

    t1 = DummyOperator(task_id = 'end')

    t0 >> external_tables_from_s3 >> t1

What is the best way to setupo this dag so that the external_tables_from_s3 tasks can be run in parallal

basically I want something like

to >> [create_external_table_for_test_1, create_external_table_for_test_2] >> t1

I was wondering what is the best way to achieve this without having to specify each task individually. The sources list is alot bigger than this and is just trimmed down for this question


Solution

  • There are a couple options depending on how you want to visualize the DAG in the UI:

    1. Using a list to contain all of the SnowflakeOperator tasks, or
    2. Use a TaskGroup.

    Option 1:

    from pendulum import datetime
    
    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
    
    
    snowflake_conn_id = "snowflake"
    
    sources = [{"name": "test_1", "path": "test1/path/"}, {"name": "test_2", "path": "test2/path"}]
    
    # define the DAG
    with DAG(
        "test_dag",
        default_args={
            "owner": "test",
            "depends_on_past": False,
            "email_on_failure": False,
            "email_on_retry": False,
        },
        description="test_dag test description.",
        start_date=datetime(2022, 2, 1),
        schedule_interval=None,
        max_active_runs=1,
    ) as dag:
    
        t0 = DummyOperator(task_id="start")
    
        snowflake_tasks = []
        for source in sources:
            create_table_sql = f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
    
            external_tables_from_s3 = SnowflakeOperator(
                task_id=f"create_external_table_for_{source['name']}",
                sql=create_table_sql,
                snowflake_conn_id=snowflake_conn_id,
            )
            snowflake_tasks.append(external_tables_from_s3)
    
        t1 = DummyOperator(task_id="end")
    
        t0 >> snowflake_tasks >> t1
    

    enter image description here

    Option 2:

    from pendulum import datetime
    
    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
    from airflow.utils.task_group import TaskGroup
    
    
    snowflake_conn_id = "snowflake"
    
    sources = [{"name": "test_1", "path": "test1/path/"}, {"name": "test_2", "path": "test2/path"}]
    
    # define the DAG
    with DAG(
        "test_dag",
        default_args={
            "owner": "test",
            "depends_on_past": False,
            "email_on_failure": False,
            "email_on_retry": False,
        },
        description="test_dag test description.",
        start_date=datetime(2022, 2, 1),
        schedule_interval=None,
        max_active_runs=1,
    ) as dag:
    
        t0 = DummyOperator(task_id="start")
    
        with TaskGroup(group_id="snowflake_tasks") as snowflake_tasks:
            for source in sources:
                create_table_sql = f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
    
                external_tables_from_s3 = SnowflakeOperator(
                    task_id=f"create_external_table_for_{source['name']}",
                    sql=create_table_sql,
                    snowflake_conn_id=snowflake_conn_id,
                )
    
        t1 = DummyOperator(task_id="end")
    
        t0 >> snowflake_tasks >> t1
    
    

    enter image description here