I am trying to create Airflow DAG for EMR Serverless application creation.
EMRServerlessCreateApplicationOperator(
task_id = "create-emrs-app",
job_type = "SPARK",
release_label = "emr-7.1.0",
config= {
"name" : "{{dag_run.conf['application_name']}}",
...other config
},
dag = main_dag
)
However DAG is failing with validation exception saying 'name' is not matching the regex pattern.
On troubleshooting I have found that 'application_name' is not getting rendered. It is trying to create an EMRS application using name "{{dag_run.conf['application_name']}}"
To further debug if jinja template rendering is working or not, I have create one more task in the same DAG using PythonOperator and print the application_name in python callable. This task is working fine and printing the name.
PythonOperator(
task_id = "log_application_name",
python_callable = print_app_name,
op_kwards = {application_name : "{{dag_run.conf['application_name']}}" },
dag = main_dag
)
I have also checked the Airflow UI 'Rendered Template' for both tasks. For PythonOperator task, template rendering is showing in UI however for EMR application creation task, 'Rendered Template' section is empty.
So any idea why jinja template rendering is not working for EMRServerlessCreateApplicationOperator.
I am using airflow 2 and amazon provider package version 8.5
The most likely cause of the issue is that the EMRServerlessCreateApplicationOperator does not natively support templating for the config parameter or for certain fields within it. Airflow operators need to explicitly declare which fields are template-able by using the template_fields
property.
As you can see in the documentation of how to create custom operator templating section you could achieve what you want by creating a subclass the EMRServerlessCreateApplicationOperator and explicitly add config or its individual fields as template-able. This approach gives you control over which fields are template-rendered.
from airflow.providers.amazon.aws.operators.emr import EMRServerlessCreateApplicationOperator
class CustomEMRServerlessCreateApplicationOperator(EMRServerlessCreateApplicationOperator):
template_fields = ('config',)
# Now use this custom operator in your DAG
CustomEMRServerlessCreateApplicationOperator(
task_id="create-emrs-app",
job_type="SPARK",
release_label="emr-7.1.0",
config={
"name": "{{ dag_run.conf['application_name'] }}",
# other config fields
},
dag=main_dag,
)