How pull xcom variable from previous run in airflow? Is it possible?
I want to use value from same task_id in previous run_id as jinja variable for data
argument in SimpeHttpOperator.
I'm looking in macros docs https://airflow.apache.org/docs/stable/macros.html and cant't find any documented way to do this.
UPD Example:
select_expired = SimpleHttpOperator(
task_id='select_expired',
http_conn_id='clickhouse_http',
endpoint='/',
method='POST',
data=REQUESTED_EXPIRED_FLIGHTS,
xcom_push=True,
pool='clickhouse_select',
dag=dag
)
where REQUESTED_EXPIRED_FLIGHTS is:
insert into table where column = '{{ ??????? (value returned in previous task) }}'
I make this function to get previous execution date, task state, xcom value. This works with Airflow 2.8.2.
def check_last_run_date(context):
previous_execution_date = False
previous_dagrun = context['ti'].get_previous_dagrun()
previous_task_state = False
previous_xcom_value = False
if previous_dagrun:
previous_ti = previous_dagrun.get_task_instance(task_id)
if previous_ti:
previous_execution_date = previous_ti.execution_date
previous_task_state = previous_ti.state
previous_xcom_value = previous_ti.xcom_pull(task_ids=task_id)
You can just delete useless for you data.
How I add this function to my tasks:
@task
def example_task(... , **kwargs):
context = kwargs
last_run_date = check_last_run_date(context)