Let's say I have an Airflow (2.3) DAG that looks like this:
@task
def retrieve_ingest_setup_queries():
settings = Variable.get("athena_settings", deserialize_json=True)
# settings = {'drops': ["DROP TABLE my_table", "DROP TABLE my_table2"],
# 'creates': ["CREATE TABLE ...", ...]}
return settings
@dag(
dag_id='athena_something',
default_args=default_args,
schedule_interval=None,
render_template_as_native_obj=True,
)
def somedag():
ingest_setup = retrieve_ingest_setup_queries()
ingest_db_setup_drops = AthenaOperator.partial(
task_id='db_drops',
database="{{ var.json.athena.database }}",
output_location="{{ var.json.athena.output_location }}",
aws_conn_id='aws_athena'
).expand(query=ingest_setup??????)
ingest_db_setup_creates = AthenaOperator.partial(
task_id='db_creates',
database="{{ var.json.athena.database }}",
output_location="{{ var.json.athena.output_location }}",
aws_conn_id='aws_athena'
).expand(query=ingest_setup??????)
I am looking for a way to set "query" in the expand method as ingest_setup['drops'] for my first operator and as ingest_setup['creates'] for the second.
I could use two different retrieval functions, but I'd like to use only one. I want to use taskflow if at all possible. Expand doesn't support templating, so I don't see how/if this can be done (see ?????? in the code).
I needed to use multiple_outputs=True for the task decorator.
Then ingest_setup['creates'] works as intended.
This only works with task decorators though, accessing the key of a dictionary that's an operator's result (XComArg) is far from intuitive. It is discussed here.
From there, I have created the following class for operator results:
class XcomDict:
def __init__(self, operator: Operator):
self.operator_output = str(operator.output).strip("{ }")
def __getitem__(self, item: str):
return f"{{{{ {self.operator_output}['{item}'] }}}}"
Of course, it assumes the operator's return value is a dictionary. I use it this way:
job = CreateJobOperator(
task_id='create_job', ...)
wait = WaiForJobOperator(
task_id='wait_for_job_to_complete',
job_id=XcomDict(job)['JobId'],..)