airflowairflow-2.xmwaa

How to do nested templating using Jinja in an Airflow DAG?


The following template for bucket_key works and it pulls the output value from task aTestPyOperator:

sensor = S3KeySensor(
        task_id='check_s3_for_file_in_s3',
        bucket_name='my_bucket',
        bucket_key= "{{params.folder}}/{{ ti.xcom_pull(task_ids='aTestPyOperator') }}/",
        params={'folder': 'test'}
    )

But what I need to be able to do is pass in the name of the task that it should use.

sensor = S3KeySensor(
        task_id='check_s3_for_file_in_s3',
        bucket_name='my_bucket',
        bucket_key= "{{params.folder}}/{{ ti.xcom_pull(task_ids='{{params.fnName}}') }}/",
        params={'folder': 'test',
                'fnName': 'aTestPyOperator'}
    )

The above does not work and the template looks like "test/NONE". I think what its evaluating is ti.xcom_pull(task_ids='{{params.fnName}}') and not ti.xcom_pull(task_ids='aTestPyOperator').

I have also tried it like this (removed the nested braces) and that didn't work either.

bucket_key= "{{params.folder}}/{{ ti.xcom_pull(task_ids='params.fnName') }}/",

At the end, what I am trying to do is set the bucket key with the output value from the previous task. Is there an easier way to do that?


Solution

  • You can treat the params as a variable inside a Jinja template.

    bucket_key= "{{ params.folder }}/{{ ti.xcom_pull(task_ids=params.fnName) }}/",
    

    Even more, you can rewrite your template in that way:

    bucket_key= "{{ params.folder + '/' + ti.xcom_pull(task_ids=params.fnName) }}/",