pythonairfloworchestration

Airflow UI parameters not being passing on to DAG


I am trying to run a DAG with user specified value for a parameter - using this documentation as a guide.

In the Airflow UI when I click the play button next to the DAG I see a page where my parameter schema_prefix is shown with its default value.

Problem: The DAG always runs with the default value for the parameter. I tried changing it and it still uses the default. I also tried setting this env variable AIRFLOW__CORE__DAG_RUN_CONF_OVERRIDES_PARAMS to true as mentioned here but it has no effect (DAG still being run with the default value).

Can anyone please help with this? Or guide me to what is the best practice for running a DAG with user provided parameters from the UI?

It is suggested here to use Airflow variables via the admin tab but this does not work in my use case as then all DAGs will run with the same parameter values. Our use case requires that different users using Airflow be able to trigger their own pipelines with their specific parameters for testing and other purposes.

Thank you

my_params = {
    "schema_prefix": Param(
        "ABC",
        description="Prefix to schema",
        type="string",
        minLength=2)
}

with DAG(
    dag_id="test_dag_1",
    start_date=datetime(2024, 1, 1),
    params=my_params,
) as dag:
    python_task = PythonOperator(
            python_callable=my_task_python_file,
            task_id="python_task_id",
            op_kwargs={k: v for k, v in my_params.items()},
        )
    )

Solution

  • I figured out why it is not working. The problem lies in the fact that I am supplying the default params contained in my_params and Airflow - for some reason - is not rendering them with the values provided viathe UI during runtime.

    So to make it I had to reference the params object which contains the runtime values using templating and that way Airflow is injecting the runtime values as shown below:

    with DAG(
        dag_id="test_dag_1",
        start_date=datetime(2024, 1, 1),
        params=my_params,
    ) as dag:
        python_task = PythonOperator(
                python_callable=my_task_python_file,
                task_id="python_task_id",
                op_kwargs={
                    key: f"{{{{ params.{key} }}}}"
                    for key in my_params
                },
            )
        )