Recently a refactoring operation was made to one of our DAGs, lots of the code was extracted into a shareable module. A true refactor shouldn't change the behaviour of the DAG but unfortunately a mistake was made and the behaviour did change, and it ultimately caused a DAG failure.
Visually the DAG looked the same before and after the change. The change in behaviour was caused by different arguments being passed to a script run by KubernetesPodOperator. I compared two executions of the DAG, those immediately before and after the change being applied. The difference can be seen in the Airflow UI by visiting the Rendered Template of the same task execution in each of those DAG executions.
I want to be able to compare two executions of a DAG to see if the behaviour has changed by comparing the information provided in this "Rendered Template" page. Is it possible to obtain this information programatically?
I've figured out that its possible to do this using the Airflow API. Here's a basic script that authenticates using a username and password:
import requests
from requests.auth import HTTPBasicAuth
import os
from pprint import pprint
from deepdiff import DeepDiff # Add this import for comparing payloads
import argparse # Add this import for command line argument parsing
def get_task_instance_info(dag_id, task_id, dag_run_id, base_url, username, password):
url = f"{base_url}/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}"
response = requests.get(url, auth=HTTPBasicAuth(username, password))
if response.status_code == 200:
return response.json()
else:
response.raise_for_status()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Get Airflow task instance info and compare two runs.')
parser.add_argument('airflow_base_url', type=str, help='Airflow base URL')
parser.add_argument('dag_id', type=str, help='DAG ID')
parser.add_argument('task_id', type=str, help='Task ID')
parser.add_argument('dag_run_id_1', type=str, help='First DAG run ID')
parser.add_argument('dag_run_id_2', type=str, help='Second DAG run ID')
args = parser.parse_args()
api_url = f"{args.airflow_base_url}/api/v1"
USERNAME = os.getenv('AIRFLOW_API_USERNAME')
PASSWORD = os.getenv('AIRFLOW_API_PASSWORD')
task_instance_info_1 = get_task_instance_info(args.dag_id, args.task_id, args.dag_run_id_1, api_url, USERNAME, PASSWORD)
task_instance_info_2 = get_task_instance_info(args.dag_id, args.task_id, args.dag_run_id_2, api_url, USERNAME, PASSWORD)
differences = DeepDiff(task_instance_info_1, task_instance_info_2)
pprint(differences)