This seems a peculiar struggle, so I'm sure I'm missing something. Somehow I can't seem to pass values using XCOM, unless I'm using functions to execute the tasks that provide and use the information and call them from PythonOperator. This works, so far so good.
But now I need to use the execution date in the sql query. Since it's embedded within a function it isn't parsed by Jinja. I get why the {{ ds }} macro is not available outside of the operators, I'm just struggling how to solve this in this case?
Example of what I'm doing currently:
def get_some_values(**context):
hook = BigQueryHook(use_legacy_sql=False)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute(
"SELECT value1, value2, value3 FROM some_dataset.some_table__{{ ds }}"
)
results = cursor.fetchone()
# Store the results in XCom
if results is not None:
for i, result in enumerate(results):
context['ti'].xcom_push(f'value{i+1}', result)
def send_slack_message(**context):
# Retrieve the results from XCom
value1 = context['ti'].xcom_pull(key='value1')
value2 = context['ti'].xcom_pull(key='value2')
value3 = context['ti'].xcom_pull(key='value3')
slack_msg = """values returned: {}, {}, {} """.format(value1, value2, value3)
send_slack_message = SlackWebhookOperator(
task_id='slack_test',
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
channel = '#some_channel',
message=slack_msg,
username='airflow',
dag=dag,
)
send_slack_message.execute(context=context)
dag = DAG(
'test',
default_args=default_args,
schedule_interval='0 12 * * *',
catchup=False,
)
get_values_to_output = PythonOperator(
task_id='get_values_to_output',
python_callable=get_some_values,
provide_context=True,
dag=dag
)
send_slack_message = PythonOperator(
task_id='send_slack_message',
python_callable=send_slack_message,
provide_context=True,
dag=dag
)
In this case the query is failing cause it just wants to select from the some_table__{{ ds }}
table.
how do I get the execution date in here? OR how do I pass values from a query to the next task without using a function?
('current date' is not good enough since I want to be able to do back runs)
The python_callable
is not a template field, where we cannot render every line in the code, but instead, Airflow provide the context variables to you function, and you can access them to format your string:
cursor.execute(
f"SELECT value1, value2, value3 FROM some_dataset.some_table__{context['ds']}"
)