while working on project came across one situation where we want to perform few task using airflow but we are not allowed to use python operator but instructed to use local BigQuery operators. can anyone help me with such code like to setup airflow variable or how to write conditional code which will get executed by BQ operators. is it possible ? if not then my next que is like is it possible that fetch result from BQ table using BQ operator and assign it to one python variable so want to use both BQ and python operator, is there any way for this?
So, here is a sample of how to use a BigQuery operator and sending data to another task using cross communicationxcom_pull
.
You can use the BigQueryGetDataOperator
or BigQueryOperator
to query the data with custom queries. Those operators will return a list for you, so you can get it in another task. I've use it in a bash operator in the example:
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryGetDataOperator,
)
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<project-name>")
BQ_LOCATION = "europe-north1"
TABLE_NAME="<table-name>"
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "<ds-name>")
with models.DAG(
"example_bigquery_operations",
schedule_interval='@once', # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
max_results=1,
selected_fields="name",
#location=BQ_LOCATION,
)
get_dataset_result = BashOperator(
task_id="get_dataset_result",
bash_command="echo \"{{ task_instance.xcom_pull('get_data') }}\"",
)
get_data >> get_dataset_result
[2021-12-06 17:12:07,641] {logging_mixin.py:109} INFO - Running <TaskInstance: example_bigquery_operations.get_dataset_result 2021-12-05T00:00:00+00:00 [running]> on host airflow-worker-c92mz
[2021-12-06 17:12:07,937] {taskinstance.py:1254} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=example_bigquery_operations
AIRFLOW_CTX_TASK_ID=get_dataset_result
AIRFLOW_CTX_EXECUTION_DATE=2021-12-05T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-12-05T00:00:00+00:00
[2021-12-06 17:12:07,939] {subprocess.py:52} INFO - Tmp dir root location:
/tmp
[2021-12-06 17:12:07,939] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo "[[\'Tom\']]"']
[2021-12-06 17:12:08,242] {subprocess.py:74} INFO - Output:
[2021-12-06 17:12:08,245] {subprocess.py:78} INFO - [['Tom']]
[2021-12-06 17:12:08,246] {subprocess.py:82} INFO - Command exited with return code 0