google-cloud-platformgoogle-bigqueryairflowdataform

How to use the dataform environments.json file when invocating the workflow from Airflow?


Context

I have an existing data pipeline in Airflow (through Google Cloud Composer) in which I execute Python and SQL-based tasks.

Airflow repository

The airflow pipeline is defined in a github repository, let's call it repository-airflow. Here the DAG:

import os
from datetime import timedelta

from airflow import DAG
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from utils.slack_alerts import task_fail_slack_alert

# Default arguments for the DAG
DEFAULT_ARGS = {
    "retries": 2,
    "retry_delay": datetime.timedelta(minutes=1),
    "on_failure_callback": task_fail_slack_alert,
}

# Environment variable of the dag
env = os.environ["ENV"]


# Create the DAG
with DAG(
    dag_id=f"dataform_test_{env}",
    start_date=datetime.datetime(
        2023, 12, day=4, tzinfo=datetime.timezone.utc
    ),
    end_date=None,
    dagrun_timeout=timedelta(hours=24),
    schedule="@once",
    default_args=DEFAULT_ARGS,
    catchup=False,
) as dag:
    PROJECT_ID = "my_project_id"
    REGION = "europe-west1"
    REPOSITORY_ID = "my_repository_id"
    # Create compilation
    create_compilation_result = (
        DataformCreateCompilationResultOperator(
            task_id="dataform_create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": "main" if env=="production" else "develop"
            },
        )
    )
    # Run all tasks
    create_whole_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id="dataform_run_all",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('dataform_create_compilation_result')['name'] }}"
        },
    )

Dataform repository

The dataform repository is defined in another github repository, let's call it repository-dataform. It has two branches main and develop. It is connected via Github to Bigquery on GCP (cf: related documentation). It contains the SQLX tasks such as example_table.sqlx:

config {
    type: 'table',
    schema: 'dataform'
}

SELECT test FROM UNNEST(["A", "B"]) as test

as well as the dataform.json file (related dataform documentation) with the content

{
  "defaultSchema": "dataform",
  "assertionSchema": "dataform_assertions",
  "warehouse": "bigquery",
  "defaultDatabase": "the-database",
  "defaultLocation": "EU",
  "tablePrefix": "stg"
}

and the environments.json file (related dataform documentation) with the content:

    "environments": [
        {
            "name": "staging",
            "gitRef": "develop"
        },
        {
            "name": "production",
            "configOverride": {
                "tablePrefix": "prd"
            },
            "gitRef": "main"
        }
    ]
}

According to the documentation, this environments.json file should be used to override the default configuration set in dataform.json. It is also used in the dataform bigquery repository example.

Problem

When running on the staging Airflow, the dag is working well. It is creating the tasks with the table prefix stg, in our example dataform.stg_example_table. However when running on the production airflow it is also creating a table with the stg prefix, instead of the prd prefix. I can't find any documentation that explains how to use the environments.json file with Airflow.

Questions

How to use the environments.json of the remote dataform repository in a DAG Airflow (through Google Cloud Composer) to override the configuration such as the tablePrefix ?

I don't want to specify the variables directly in the airflow dag as explain in this stackoverflow question: How do I pass variables from Airflow to (GCP) Dataform? but use the environments.json file defined in the repository-dataform.


Solution

  • I contacted Google Support and shared this stackoverflow post. Here, their response:

    Regarding your question, the GCP version of Dataform does not support (and it is not currently planned to do so) the environments.json configuration file. The documentation and git repository you refer to are those of Dataform which is the historical product (before acquisition by Google cloud and integration into gcp) which today became gcp dataform. You will need to refer to the GCP documentation in the future.
    In order to manage dataform variables, you have no other choice than to use the dataform.json file and override the variables during compilation through the API or the Airflow operator.

    I followed their advice and created a custom Dataform Airflow Operator to avoid having to define each time the variable of the compilation results. I created a util (UtilEnv) that based on the environment (production or staging) give some information (brand name, table prefix etc).:

    from typing import Optional
    
    from airflow.providers.google.cloud.operators.dataform import (
        DataformCreateCompilationResultOperator,
    )
    from custom_airflow_modules.config import (
        DATAFORM_PRODUCTION_REPOSITORY_ID,
        DATAFORM_STAGING_REPOSITORY_ID,
    )
    from utils.util_env.util_env import UtilEnv
    
    
    def get_repository_id(util_env: UtilEnv):
        return (
            DATAFORM_PRODUCTION_REPOSITORY_ID
            if util_env.is_production()
            else DATAFORM_STAGING_REPOSITORY_ID
        )
    
    
    class CustomDataformCreateCompilationResultOperator(
        DataformCreateCompilationResultOperator
    ):
        def __init__(
            self,
            *args,
            task_id: str,
            util_env: UtilEnv,
            addition_vars_compilation_result: Optional[dict] = None,
            **kwargs,
        ):
            compilation_result = {
                "git_commitish": util_env.to_branch_name(),
                "code_compilation_config": {
                    "table_prefix": util_env.value,
                    "vars": {
                        "env": util_env.value,
                    },
                },
            }
            if addition_vars_compilation_result is not None:
                compilation_result["code_compilation_config"]["vars"].update(
                    addition_vars_compilation_result
                )
    
            super().__init__(
                *args,
                **kwargs,
                task_id=task_id,
                project_id=util_env.get_project_id(),
                region=util_env.get_region(),
                repository_id=get_repository_id(util_env),
                compilation_result=compilation_result,
            )