airflowdirected-acyclic-graphsairflow-2.x

Create custom Apache Airflow FileSensor that retries after sensor timeout


I want my FileSensor to check in a particular directory for a file, and if that file doesn't exist I want it to create that file and then look for it again. This time, as the file exists, it should proceed onto the next task, for this reason I came up with the following code

FILE_FOLDER = "/mnt/c/path/to/file"

testDag = DAG(
    dag_id="orchestrator-dag",
    schedule_interval='@daily',
    start_date=days_ago(1)
)

def create_file_on_retry():
    text = '''Some random data\n'''

    # write to data.txt
    with open(FILE_FOLDER, 'w') as f:
        f.write(text)


file_sensor_task = FileSensor(
    task_id="check-for-file",
    filepath=FILE_FOLDER,
    poke_interval=5,
    timeout=30,
    on_retry_callback=create_file_on_retry,
    retries=2,
    dag=testDag
)

However my task kept failing over and over and that was when I stumbled on to this article which states that

If a sensor times out, it will not retry Previously, a sensor is retried when it times out until the number of retries are exhausted. So the effective timeout of a sensor is timeout * (retries + 1). This behaviour is now changed. A sensor will immediately fail without retrying if timeout is reached. If it’s desirable to let the sensor continue running for longer time, set a larger timeout instead.

This was implemented in Airflow 2.2 while I am currently using Airflow 2.8.1. What I would like to do is create a custom BaseSensorOperator that will allow me to create the required file upon retry. I am looking for help with the BaseSensorOperator code or a solution that can already do this.


Solution

  • It sounds like what you want isn't really a sensor? The essence of a sensor is to be willing to wait for something to become true through some external force. Do you actually want to wait the 30 seconds for the file to come into existence or did you just set a relatively low period of time?

    It sounds like you'd be best served by simply writing a PythonOperator that checks for the existence of the file, and if it doesn't exist, immediately create the file and return success.

    If you really do want to wait for a given period before creating the file, I suggest you move your on_failure_callback to be a PythonOperator task with the all_failed trigger rule, so it will only run if the sensor fails. Then, everything downstream of the original sensor can run even if the sensor fails. You may need to make the next task have a trigger_rule of one_success.