I am writing an airflow dag which will read a bunch of configs from the database and will then execute a series of Python scripts using bash operator. The configs which were read previously will be passed as arguments.
The problem is I am not getting an efficient way to share the config with the other downstream operators. I designed the below dag. Below are my concerns.
I am not sure how many DB calls will be made to fetch the values required inside the jinja templates (in the below example).
Besides as the config is the same in every task, I am not sure if it's a good idea to fetch it every time from the database. That's why I don't want to use xcom also. I used the airflow variable because the JSON parsing can happen in a single line. But still, the database call issue is there I guess.
class ReturningMySqlOperator(MySqlOperator):
def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
s = hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
s = s.set_index('laptopName', drop=False)
print(s)
s = s.to_json(orient='index')
Variable.set('jobconfig', s)
t1 = ReturningMySqlOperator(
task_id='mysql_query',
sql='SELECT * FROM laptops',
mysql_conn_id='mysql_db_temp',
dag=dag)
t3 = BashOperator(
task_id='sequence_one',
bash_command='python3 path/sequence1.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
dag=dag)
t4 = BashOperator(
task_id='sequence_two',
bash_command='python3 path/sequence2.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
dag=dag)
t5 = BashOperator(
task_id='sequence_three',
bash_command='python3 path/sequence3.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
dag=dag)
t6 = BashOperator(
task_id='sequence_four',
bash_command='python3 path/sequence4.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',
dag=dag)
t1 >> t3
t3 >> [t4,t6]
I am not sure how many DB calls will be made to fetch the values required inside the jinja templates (in the below example).
In the example you provided, you are making two connections to the metadata DB in each sequence_x
task, one per each {{var.json.jobconfig.xx}}
call. The good news is that those are not being executed by the scheduler so are not being done every heartbeat interval. From Astronomer guide:
Since all top-level code in DAG files is interpreted every scheduler "heartbeat," macros and templating allow run-time tasks to be offloaded to the executor instead of the scheduler.
I think the key aspect here is that the value you want to pass downstream is always the same and won't change after you executed T1
.
There may be a few approaches here, but if you want to minimize the number of calls to the DB, and avoid XComs
at all, you should use the TriggerDagRunOperator
.
To do so you have to split your DAG into two parts, having the controller DAG with the task where you fetch the data from MySQL, triggering a second DAG where you execute all of the BashOperator
using the values you obtained from the controller DAG. You can pass in the data using conf
parameter.
Here is an example based on the official Airflow example DAGs:
from airflow import DAG
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
def _data_from_mysql():
# fetch data from the DB or anywhere else
# set a Variable
data = {'legion': {'company': 'some_company', 'laptop': 'great_laptop'}}
Variable.set('jobconfig', data, serialize_json=True)
dag = DAG(
dag_id="example_trigger_controller_dag",
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval="@once",
tags=['example'],
)
get_data_from_MySql = PythonOperator(
task_id='get_data_from_MySql',
python_callable=_data_from_mysql,
)
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",
# Ensure this equals the dag_id of the DAG to trigger
trigger_dag_id="example_trigger_target_dag",
conf={"message": "Company is {{var.json.jobconfig.legion.company}}"},
execution_date='{{ds}}',
dag=dag,
)
get_data_from_MySql >> trigger
When the trigger
task gets executed will include the key message
as part of the configuration for the DAG run of the second DAG.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
dag = DAG(
dag_id="example_trigger_target_dag",
default_args={"owner": "airflow"},
start_date=days_ago(2),
schedule_interval=None,
tags=['example'],
)
def run_this_func(**context):
"""
Print the payload "message" passed to the DagRun conf attribute.
:param context: The execution context
:type context: dict
"""
print("Remotely received value of {} for key=message".format(
context["dag_run"].conf["message"]))
run_this = PythonOperator(
task_id="run_this", python_callable=run_this_func, dag=dag)
bash_task_1 = BashOperator(
task_id="bash_task_1",
bash_command='echo "Here is the message: $message"',
env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
dag=dag
)
The logs of bash_task_1
in this example will include:
[2021-05-05 15:40:35,410] {bash.py:158} INFO - Running command: echo "Here is the message: $message"
[2021-05-05 15:40:35,418] {bash.py:169} INFO - Output:
[2021-05-05 15:40:35,419] {bash.py:173} INFO - Here is the message: Company is some_company
[2021-05-05 15:40:35,420] {bash.py:177} INFO - Command exited with return code 0
Variable
Variable
in conf
dag_run.conf
This way you are only reading from metadaba DB once, when the second DAG is triggered.
Also, to avoid repeating too much code during the BashOperator
tasks definition you could do something like this:
templated_bash_cmd = """
python3 {{params.path_to_script}} {{dag_run.conf["laptopName"]}} {{dag_run.conf["company"]}}
"""
bash_task_1 = BashOperator(
task_id="bash_task_1",
bash_command=templated_bash_cmd,
params={
'path_to_script': 'path/sequence1.py'
},
dag=dag
)
Let me know if that worked for you!