I'm trying to get my head around the TaskFlow API & XCom in airflow and am getting stuck, hoping someone here can help. I'm using EmrServerlessCreateApplicationOperator
and I want to pass a value to it from another task which has been created using the @task
decorator. My DAG definition is:
import datetime
from airflow.decorators import task, dag
from airflow.providers.amazon.aws.operators.emr import EmrServerlessCreateApplicationOperator
@dag(
dag_id="demo-xcom-problem",
start_date=datetime.datetime(2021, 1, 1),
catchup=False
)
def taskflow():
@task(multiple_outputs=True)
def config() -> dict:
return {
"name": "my-spark-app",
}
create_app_fails = EmrServerlessCreateApplicationOperator(
task_id="create_spark_app_fails",
job_type="SPARK",
release_label="emr-6.9.0",
config=config(),
aws_conn_id="",
)
create_app_succeeds = EmrServerlessCreateApplicationOperator(
task_id="create_spark_app_succeeds",
job_type="SPARK",
release_label="emr-6.9.0",
config={
"name": "my-spark-app",
},
aws_conn_id="",
)
create_app_fails
create_app_succeeds
taskflow()
ERROR - Failed to execute job 730 for task create_spark_app_fails (botocore.client.ClientCreator._create_api_method.._api_call() argument after ** must be a mapping, not PlainXComArg; 2990)
From that error I'm. guessing that I'm not passing the value properly. But I can't figure out how to do it. Hoping someone here can tell me how to solve this.
The problem was that EmrServerlessCreateApplicationOperator
could not (in the version I was using) accept templated values for the config parameter. I have blogged about a solution here: https://medium.com/@jamiekt/subclass-airflow-operators-to-make-parameters-templateable-b2b0ea64b39e