I inherited the following dag which is running on AWS MWAA v2.2.2
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
}
snowflake_conn_id='snowflake'
sources = [
{'name': 'test_1', 'path': 'test1/path/'},
{'name': 'test_2', 'path': 'test2/path'}
]
# define the DAG
with DAG(
'test_dag',
default_args=default_args,
description='test_dag test description.',
schedule_interval=None,
max_active_runs=1
) as dag:
t0 = DummyOperator(task_id = 'start')
for source in sources:
create_table_sql = (
f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
)
external_tables_from_s3 = SnowflakeOperator(
task_id=f"create_external_table_for_{source['name']}",
dag=dag,
sql=create_table_sql,
snowflake_conn_id=snowflake_conn_id
)
t1 = DummyOperator(task_id = 'end')
t0 >> external_tables_from_s3 >> t1
What is the best way to setupo this dag so that the external_tables_from_s3 tasks can be run in parallal
basically I want something like
to >> [create_external_table_for_test_1, create_external_table_for_test_2] >> t1
I was wondering what is the best way to achieve this without having to specify each task individually. The sources list is alot bigger than this and is just trimmed down for this question
There are a couple options depending on how you want to visualize the DAG in the UI:
SnowflakeOperator
tasks, orTaskGroup
.Option 1:
from pendulum import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
snowflake_conn_id = "snowflake"
sources = [{"name": "test_1", "path": "test1/path/"}, {"name": "test_2", "path": "test2/path"}]
# define the DAG
with DAG(
"test_dag",
default_args={
"owner": "test",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
},
description="test_dag test description.",
start_date=datetime(2022, 2, 1),
schedule_interval=None,
max_active_runs=1,
) as dag:
t0 = DummyOperator(task_id="start")
snowflake_tasks = []
for source in sources:
create_table_sql = f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
external_tables_from_s3 = SnowflakeOperator(
task_id=f"create_external_table_for_{source['name']}",
sql=create_table_sql,
snowflake_conn_id=snowflake_conn_id,
)
snowflake_tasks.append(external_tables_from_s3)
t1 = DummyOperator(task_id="end")
t0 >> snowflake_tasks >> t1
Option 2:
from pendulum import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.task_group import TaskGroup
snowflake_conn_id = "snowflake"
sources = [{"name": "test_1", "path": "test1/path/"}, {"name": "test_2", "path": "test2/path"}]
# define the DAG
with DAG(
"test_dag",
default_args={
"owner": "test",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
},
description="test_dag test description.",
start_date=datetime(2022, 2, 1),
schedule_interval=None,
max_active_runs=1,
) as dag:
t0 = DummyOperator(task_id="start")
with TaskGroup(group_id="snowflake_tasks") as snowflake_tasks:
for source in sources:
create_table_sql = f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
external_tables_from_s3 = SnowflakeOperator(
task_id=f"create_external_table_for_{source['name']}",
sql=create_table_sql,
snowflake_conn_id=snowflake_conn_id,
)
t1 = DummyOperator(task_id="end")
t0 >> snowflake_tasks >> t1