google-bigqueryairflowbq

how to achieve python task using local bq operator from airflow


while working on project came across one situation where we want to perform few task using airflow but we are not allowed to use python operator but instructed to use local BigQuery operators. can anyone help me with such code like to setup airflow variable or how to write conditional code which will get executed by BQ operators. is it possible ? if not then my next que is like is it possible that fetch result from BQ table using BQ operator and assign it to one python variable so want to use both BQ and python operator, is there any way for this?


Solution

  • So, here is a sample of how to use a BigQuery operator and sending data to another task using cross communicationxcom_pull.

    You can use the BigQueryGetDataOperator or BigQueryOperator to query the data with custom queries. Those operators will return a list for you, so you can get it in another task. I've use it in a bash operator in the example:

    from airflow import models
    from airflow.operators.bash import BashOperator
    from airflow.providers.google.cloud.operators.bigquery import (
        BigQueryGetDataOperator,
    )
    from airflow.utils.dates import days_ago
    
    PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<project-name>")
    BQ_LOCATION = "europe-north1"
    TABLE_NAME="<table-name>"
    DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "<ds-name>")
    
    
    with models.DAG(
        "example_bigquery_operations",
        schedule_interval='@once',  # Override to match your needs
        start_date=days_ago(1),
        tags=["example"],
    ) as dag:
    
        get_data = BigQueryGetDataOperator(
            task_id="get_data",
            dataset_id=DATASET_NAME,
            table_id=TABLE_NAME,
            max_results=1,
            selected_fields="name",
            #location=BQ_LOCATION,
        )
    
        
        get_dataset_result = BashOperator(
            task_id="get_dataset_result",
            bash_command="echo \"{{ task_instance.xcom_pull('get_data') }}\"",
        )
        
        get_data >> get_dataset_result
    
    [2021-12-06 17:12:07,641] {logging_mixin.py:109} INFO - Running <TaskInstance: example_bigquery_operations.get_dataset_result 2021-12-05T00:00:00+00:00 [running]> on host airflow-worker-c92mz
    [2021-12-06 17:12:07,937] {taskinstance.py:1254} INFO - Exporting the following env vars:
    AIRFLOW_CTX_DAG_OWNER=airflow
    AIRFLOW_CTX_DAG_ID=example_bigquery_operations
    AIRFLOW_CTX_TASK_ID=get_dataset_result
    AIRFLOW_CTX_EXECUTION_DATE=2021-12-05T00:00:00+00:00
    AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-12-05T00:00:00+00:00
    [2021-12-06 17:12:07,939] {subprocess.py:52} INFO - Tmp dir root location: 
     /tmp
    [2021-12-06 17:12:07,939] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo "[[\'Tom\']]"']
    [2021-12-06 17:12:08,242] {subprocess.py:74} INFO - Output:
    [2021-12-06 17:12:08,245] {subprocess.py:78} INFO - [['Tom']]
    [2021-12-06 17:12:08,246] {subprocess.py:82} INFO - Command exited with return code 0