airflowairflow-taskflow

Can you put retries on an individual TaskFlow @task?


All the examples of retries on tasks in the Airflow docs are on things like BashOperator. Manual tests on version 2.5.1 show that this doesn't work on Taskflow:

    @task
    def test_retries(retries=2):
        raise ValueError("I failed, please retry")

    test_retries()

Is there any way to put retries on a particular Taskflow task, not just default args for all tasks?


Solution

  • Yes absolutely, but you set it (and other BaseOperator parameters like pool, email_on_failure, retry_delay, queue, etc.) on the decorator itself:

    @task(retries=2)
    def test_retries():
        raise ValueError("I failed, please retry")
    
    test_retries()