My dag is started with configuration JSON:
{"foo" : "bar"}
I have a Python operator which uses this value:
my_task = PythonOperator(
task_id="my_task",
op_kwargs={"foo": "{{ dag_run.conf['foo'] }}"},
python_callable=lambda foo: print(foo))
I’d like to replace it with a TaskFlow task…
@task
def my_task:
# how to get foo??
How can I get a reference to context, dag_run, or otherwise get to the configuration JSON from here?
There are several ways to do this using the TaskFlow API:
import datetime
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
@dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
def so_75303816():
@task
def example_1(**context):
foo = context["dag_run"].conf["foo"]
print(foo)
@task
def example_2(dag_run=None):
foo = dag_run.conf["foo"]
print(foo)
@task
def example_3():
context = get_current_context()
foo = context["dag_run"].conf["foo"]
print(foo)
@task
def example_4(params=None):
foo = params["foo"]
print(foo)
example_1()
example_2()
example_3()
example_4()
so_75303816()
Depending on your needs/preference, you can use one of the following examples:
example_1
: You get all task instance context variables and have to extract "foo".example_2
: You explicitly state via arguments you want only dag_run
from the task instance context variables. Note that you have to default arguments to None
.example_3
: You can also fetch the task instance context variables from inside a task using airflow.operators.python.get_current_context()
.example_4
: DAG run context is also available via a variable named "params".For more information, see https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#accessing-context-variables-in-decorated-tasks and https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#variables.