airflowairflow-webserver

Add parameters to dag


I would like to set some parameters to my dag file. I would like to add two parameters named: is_debug and seti. I would also like to set default values to them so if i do not specify them when running manually a dag them to be is_debug=False and seti='FG'. How can i achieve that? Please also instruct me how should i insert the values when triggering manually the dag from GUI in Configuration JSON (Optional): {}

This is my dag:

import lib.log as log
import traceback
import datetime
import json

def my_main(**kwargs):
    try:
        #if is_debug == True:
        #   log.log_it('U in debug mode')
        #else:
            log.log_it('U in normal mode')
        
        #if seti == 'FG':
        #   log.log_it('FG')
        #else:
        #   log.log_it('RX')


from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator


dag = DAG(
    'my_main', 
    description='my_main', 
    start_date=datetime.datetime(2023,6,8,1,0), 
    schedule_interval="0 1 * * *",
    catchup=False
)
task_my_main = PythonOperator(task_id='task_my_main_main', provide_context=True, python_callable=my_main, dag=dag)

Solution

  • You can add params to the Dag and when you run it manually you can change it accordingly.

    from datetime import datetime
    
    from airflow import DAG
    from airflow.models import Param
    from airflow.operators.python import PythonOperator
    
    is_debug_param = "{{ params.is_debug }}"
    seti_param = "{{ params.seti }}"
    
    
    def my_main(is_debug: bool, seti: str):
        print(is_debug)
        print(seti)
    
    
    with DAG(
            dag_id="my_main",
            description='my_main',
            start_date=datetime(2022, 6, 8, 1, 0),
            schedule_interval=None,
            catchup=False,
            params={
                "is_debug": Param(default=False, type="boolean", ),
                "seti": Param(default="FG", type="string", enum=["FG", "RX"]),
            },
            render_template_as_native_obj=True,
    ) as dag:
        task_my_main = PythonOperator(
            task_id='task_my_main_main',
            provide_context=True,
            python_callable=my_main,
            op_kwargs={
                "is_debug": is_debug_param,
                "seti": seti_param,
            },
        )
    
        (
            task_my_main
        )
    

    if you run it manually, you can trigger it with or without config

    enter image description here

    if you trigger it with config, you can fill the values you want otherwise it would be the defaults you set in the code.

    enter image description here