I'm learning Airflow TaskFlow API and now I struggle with following problem:
I'm trying to make dependencies between FileSensor() and @task and I want to run my download() task after successful result of my FileSensor
@dag():
def my_dag():
if_file_exists = FileSensor(
task_id="file_sensor",
filepath=***,
fs_conn_id=***,
poke_interval=***,
)
if_file_exists >> download
show(transform(download())) >> upload()
And then a get an error
Broken DAG: [/opt/airflow/dags/test_dag.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 230, in set_downstream
self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 175, in _set_relatives
task_object.update_relative(self, not upstream)
AttributeError: 'function' object has no attribute 'update_relative'
I read article from Airflow docs but unfortunetely I couldn't overcome this error
Presumably download
is a TaskFlow function, but in if_file_exists >> download
the function is not actually called and therefore is just a function object. If you change the dependency notation to if_file_exists >> download()
you shouldn't have an AttributeError.
Although, if you just want the "download" task to appear once in your DAG, set the call of download
to a variable and reference that variable instead. Otherwise each time download
is called a new node will appear in the DAG.
_download = download()
if_file_exists >> _download
show(transform(_download)) >> upload()