All the examples of retries on tasks in the Airflow docs are on things like BashOperator. Manual tests on version 2.5.1 show that this doesn't work on Taskflow:
@task
def test_retries(retries=2):
raise ValueError("I failed, please retry")
test_retries()
Is there any way to put retries on a particular Taskflow task, not just default args for all tasks?
Yes absolutely, but you set it (and other BaseOperator parameters like pool
, email_on_failure
, retry_delay
, queue
, etc.) on the decorator itself:
@task(retries=2)
def test_retries():
raise ValueError("I failed, please retry")
test_retries()