airflowairflow-2.xairflow-taskflow

Airflow TaskFlow API connect with filesensor


I'm learning Airflow TaskFlow API and now I struggle with following problem:

I'm trying to make dependencies between FileSensor() and @task and I want to run my download() task after successful result of my FileSensor

@dag():
def my_dag():

    if_file_exists = FileSensor(
            task_id="file_sensor",
            filepath=***, 
            fs_conn_id=***,
            poke_interval=***,
        )

    if_file_exists >> download
    show(transform(download())) >> upload()

And then a get an error

Broken DAG: [/opt/airflow/dags/test_dag.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 230, in set_downstream
    self._set_relatives(task_or_task_list, upstream=False, edge_modifier=edge_modifier)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 175, in _set_relatives
    task_object.update_relative(self, not upstream)
AttributeError: 'function' object has no attribute 'update_relative'

I read article from Airflow docs but unfortunetely I couldn't overcome this error

https://airflow.apache.org/docs/apache-airflow/2.3.0/tutorial_taskflow_api.html#adding-dependencies-between-decorated-and-traditional-tasks


Solution

  • Presumably download is a TaskFlow function, but in if_file_exists >> download the function is not actually called and therefore is just a function object. If you change the dependency notation to if_file_exists >> download() you shouldn't have an AttributeError.

    Although, if you just want the "download" task to appear once in your DAG, set the call of download to a variable and reference that variable instead. Otherwise each time download is called a new node will appear in the DAG.

    _download = download()
    if_file_exists >> _download
    show(transform(_download)) >> upload()