I want to run a set of tasks like this:
a >> [b,c,d] >> [e,f,g] >> [h,i,j,k,l,m]
First run task a, when that is done, run b,c,d in parallel, then when the last of b,c,d is done. start running e,f,g in parallel etc.
But i'm getting an error with unsupported operand type(s) for >>: 'list' and 'list'
what is the correct syntax for what I want to do?
The error you are getting is related to the fact that dependencies between lists using bitwise operator are not supported, [task_a, task_b] >> [task_c, task_d]
won't work.
IMHO the easiest and cleaner way to achieve what you are looking for (there are others) is to use TaskGroup
and set depenencies between them, like this:
Graph view:
from time import sleep
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.task_group import TaskGroup
default_args = {
'start_date': days_ago(1)
}
def _execute_task(**kwargs):
print(f"Task_id: {kwargs['ti'].task_id}")
sleep(10)
def _create_python_task(name):
return PythonOperator(
task_id=f'task_{name}',
python_callable=_execute_task)
with DAG('parallel_tasks_example', schedule_interval='@once',
default_args=default_args, catchup=False) as dag:
task_a = DummyOperator(task_id='task_a')
with TaskGroup('first_group') as first_group:
for name in list('bcd'):
task = _create_python_task(name)
with TaskGroup('second_group') as second_group:
for name in list('efg'):
task = _create_python_task(name)
with TaskGroup('third_group') as third_group:
for name in list('hijk'):
task = _create_python_task(name)
task_a >> first_group >> second_group >> third_group
From TaskGroup
class definition:
A collection of tasks. When set_downstream() or set_upstream() are called on the TaskGroup, it is applied across all tasks within the group if necessary.
You can find an official example about here .