I am generating a start
and end
time for an API query using the following:
startTime = datetime.now(pytz.timezone('US/Eastern')) - timedelta(hours = 1)
endTime = datetime.now(pytz.timezone('US/Eastern'))
This works great and generates the correct parameters for the API query. But I noticed if the task fails and if I try to rerun the task again it uses new values for startTime
and endTime
based on the DAG executed runtime.
I am trying to figure out how I can make this more idempotent so if the task fails I can rerun it and the same startTime
and endTime
will be used from the original task execution.
I have read a bit about templates and macros but I can't seem to get it to work correctly.
Here is the task code. I am using the KubernetesPodOperator.
ant_get_logs = KubernetesPodOperator(
env_vars={
"startTime": startTime.strftime('%Y-%m-%d %H:%M:%S'),
"endTime": endTime.strftime('%Y-%m-%d %H:%M:%S'),
"timeZone":'US/Eastern',
"session":'none',
},
volumes=[volume],
volume_mounts=[volume_mount],
task_id='ant_get_logs',
image='test:1.0.0',
image_pull_policy='Always',
in_cluster=True,
namespace=namespace,
name='kubepod_ant_get_logs',
random_name_suffix=True,
labels={'app': 'backend', 'env': 'dev'},
reattach_on_restart=True,
is_delete_operator_pod=True,
get_logs=True,
log_events_on_failure=True,
)
Thanks
The best and easiest way to get an idempotent task with Airflow is by leveraging templates and the logical/execution_date
variable.
Every DAG run and task instance have associated a date interval and a logical date, which depend on the DAG schedule.
In your case, if your DAG have an hourly schedule it will create a new DAG run each hour, with an associated logical date. This logical date won't change even if you restart the task, thus ensuring idempotence. So you can rewrite the envars as
"startTime": "{{ dag_run.logical_date.strftime('%Y-%m-%d %H:%M:%S') }}"
"endTime": "{{ (dag_run.logical_date + macros.timedelta(hours = 1)).strftime('%Y-%m-%d %H:%M:%S') }}"
or
"startTime": "{{ data_interval_start.strftime('%Y-%m-%d %H:%M:%S') }}"
"endTime": "{{ data_interval_end.strftime('%Y-%m-%d %H:%M:%S') }}"
NOTE: you can use templates in the
env_vars
field because it is a templated field in theKubernetesPodOperator
operator.
I highly suggest you to read Airflow DAG Runs to better understand the logical date concept and the Airflow Templates and predefined Variables for a list of all the variables and macros you can use in templates.