pythonjinja2airflowidempotent

apache airflow idempotent DAG implementation


I am generating a start and end time for an API query using the following:

startTime = datetime.now(pytz.timezone('US/Eastern')) - timedelta(hours = 1)
endTime = datetime.now(pytz.timezone('US/Eastern'))

This works great and generates the correct parameters for the API query. But I noticed if the task fails and if I try to rerun the task again it uses new values for startTime and endTime based on the DAG executed runtime.

I am trying to figure out how I can make this more idempotent so if the task fails I can rerun it and the same startTime and endTime will be used from the original task execution.

I have read a bit about templates and macros but I can't seem to get it to work correctly.

Here is the task code. I am using the KubernetesPodOperator.

ant_get_logs = KubernetesPodOperator(
    env_vars={
        "startTime": startTime.strftime('%Y-%m-%d %H:%M:%S'),
        "endTime": endTime.strftime('%Y-%m-%d %H:%M:%S'),
        "timeZone":'US/Eastern',
        "session":'none',
    },

    volumes=[volume],
    volume_mounts=[volume_mount],

    task_id='ant_get_logs',
    image='test:1.0.0',
    image_pull_policy='Always',
    in_cluster=True,
    namespace=namespace,
    name='kubepod_ant_get_logs',
    random_name_suffix=True,
    labels={'app': 'backend', 'env': 'dev'},
    reattach_on_restart=True,
    is_delete_operator_pod=True,
    get_logs=True,
    log_events_on_failure=True,
)

Thanks


Solution

  • The best and easiest way to get an idempotent task with Airflow is by leveraging templates and the logical/execution_date variable.

    Every DAG run and task instance have associated a date interval and a logical date, which depend on the DAG schedule.

    In your case, if your DAG have an hourly schedule it will create a new DAG run each hour, with an associated logical date. This logical date won't change even if you restart the task, thus ensuring idempotence. So you can rewrite the envars as

    "startTime": "{{ dag_run.logical_date.strftime('%Y-%m-%d %H:%M:%S') }}"
    "endTime": "{{ (dag_run.logical_date + macros.timedelta(hours = 1)).strftime('%Y-%m-%d %H:%M:%S') }}"
    

    or

    "startTime": "{{ data_interval_start.strftime('%Y-%m-%d %H:%M:%S') }}"
    "endTime": "{{ data_interval_end.strftime('%Y-%m-%d %H:%M:%S') }}"
    

    NOTE: you can use templates in the env_vars field because it is a templated field in the KubernetesPodOperator operator.

    I highly suggest you to read Airflow DAG Runs to better understand the logical date concept and the Airflow Templates and predefined Variables for a list of all the variables and macros you can use in templates.