airflow

How pull xcom variable from previous run


How pull xcom variable from previous run in airflow? Is it possible?

I want to use value from same task_id in previous run_id as jinja variable for data argument in SimpeHttpOperator.

f

I'm looking in macros docs https://airflow.apache.org/docs/stable/macros.html and cant't find any documented way to do this.

UPD Example:

select_expired = SimpleHttpOperator(
    task_id='select_expired',
    http_conn_id='clickhouse_http',
    endpoint='/',
    method='POST',
    data=REQUESTED_EXPIRED_FLIGHTS,
    xcom_push=True,
    pool='clickhouse_select',
    dag=dag
)

where REQUESTED_EXPIRED_FLIGHTS is:

insert into table where column = '{{ ??????? (value returned in previous task) }}'

Solution

  • I make this function to get previous execution date, task state, xcom value. This works with Airflow 2.8.2.

    def check_last_run_date(context):
        previous_execution_date = False
        previous_dagrun = context['ti'].get_previous_dagrun()
        previous_task_state = False
        previous_xcom_value = False
        if previous_dagrun:
            previous_ti = previous_dagrun.get_task_instance(task_id)
            if previous_ti:
                previous_execution_date = previous_ti.execution_date
                previous_task_state = previous_ti.state
                previous_xcom_value = previous_ti.xcom_pull(task_ids=task_id)
    

    You can just delete useless for you data.

    How I add this function to my tasks:

    @task
    def example_task(... , **kwargs):
        context = kwargs
        last_run_date = check_last_run_date(context)