airflowairflow-2.xairflow-taskflow

Get dag_run context in Airflow TaskFlow task


My dag is started with configuration JSON:

{"foo" : "bar"}

I have a Python operator which uses this value:

my_task = PythonOperator(
    task_id="my_task",
    op_kwargs={"foo": "{{ dag_run.conf['foo'] }}"},
    python_callable=lambda foo: print(foo))

I’d like to replace it with a TaskFlow task…

@task
def my_task:
  # how to get foo??

How can I get a reference to context, dag_run, or otherwise get to the configuration JSON from here?


Solution

  • There are several ways to do this using the TaskFlow API:

    import datetime
    
    from airflow.decorators import dag, task
    from airflow.operators.python import get_current_context
    
    
    @dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
    def so_75303816():
        @task
        def example_1(**context):
            foo = context["dag_run"].conf["foo"]
            print(foo)
    
        @task
        def example_2(dag_run=None):
            foo = dag_run.conf["foo"]
            print(foo)
    
        @task
        def example_3():
            context = get_current_context()
            foo = context["dag_run"].conf["foo"]
            print(foo)
    
        @task
        def example_4(params=None):
            foo = params["foo"]
            print(foo)
    
        example_1()
        example_2()
        example_3()
        example_4()
    
    
    so_75303816()
    

    Depending on your needs/preference, you can use one of the following examples:

    For more information, see https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#accessing-context-variables-in-decorated-tasks and https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#variables.