I have an existing data pipeline in Airflow (through Google Cloud Composer) in which I execute Python and SQL-based tasks.
The airflow pipeline is defined in a github repository, let's call it repository-airflow
.
Here the DAG:
import os
from datetime import timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from utils.slack_alerts import task_fail_slack_alert
# Default arguments for the DAG
DEFAULT_ARGS = {
"retries": 2,
"retry_delay": datetime.timedelta(minutes=1),
"on_failure_callback": task_fail_slack_alert,
}
# Environment variable of the dag
env = os.environ["ENV"]
# Create the DAG
with DAG(
dag_id=f"dataform_test_{env}",
start_date=datetime.datetime(
2023, 12, day=4, tzinfo=datetime.timezone.utc
),
end_date=None,
dagrun_timeout=timedelta(hours=24),
schedule="@once",
default_args=DEFAULT_ARGS,
catchup=False,
) as dag:
PROJECT_ID = "my_project_id"
REGION = "europe-west1"
REPOSITORY_ID = "my_repository_id"
# Create compilation
create_compilation_result = (
DataformCreateCompilationResultOperator(
task_id="dataform_create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": "main" if env=="production" else "develop"
},
)
)
# Run all tasks
create_whole_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id="dataform_run_all",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('dataform_create_compilation_result')['name'] }}"
},
)
The dataform repository is defined in another github repository, let's call it repository-dataform
. It has two branches main
and develop
. It is connected via Github to Bigquery on GCP (cf: related documentation).
It contains the SQLX tasks
such as example_table.sqlx
:
config {
type: 'table',
schema: 'dataform'
}
SELECT test FROM UNNEST(["A", "B"]) as test
as well as the dataform.json
file (related dataform documentation) with the content
{
"defaultSchema": "dataform",
"assertionSchema": "dataform_assertions",
"warehouse": "bigquery",
"defaultDatabase": "the-database",
"defaultLocation": "EU",
"tablePrefix": "stg"
}
and the environments.json
file (related dataform documentation) with the content:
"environments": [
{
"name": "staging",
"gitRef": "develop"
},
{
"name": "production",
"configOverride": {
"tablePrefix": "prd"
},
"gitRef": "main"
}
]
}
According to the documentation, this environments.json
file should be used to override the default configuration set in dataform.json
. It is also used in the dataform bigquery repository example.
When running on the staging Airflow, the dag is working well. It is creating the tasks with the table prefix stg
, in our example dataform.stg_example_table
. However when running on the production airflow it is also creating a table with the stg
prefix, instead of the prd
prefix. I can't find any documentation that explains how to use the environments.json
file with Airflow.
How to use the environments.json
of the remote dataform repository in a DAG Airflow (through Google Cloud Composer) to override the configuration such as the tablePrefix ?
I don't want to specify the variables directly in the airflow dag as explain in this stackoverflow question: How do I pass variables from Airflow to (GCP) Dataform? but use the environments.json
file defined in the repository-dataform
.
I contacted Google Support and shared this stackoverflow post. Here, their response:
Regarding your question, the GCP version of Dataform does not support (and it is not currently planned to do so) the environments.json configuration file. The documentation and git repository you refer to are those of Dataform which is the historical product (before acquisition by Google cloud and integration into gcp) which today became gcp dataform. You will need to refer to the GCP documentation in the future.
In order to manage dataform variables, you have no other choice than to use the dataform.json file and override the variables during compilation through the API or the Airflow operator.
I followed their advice and created a custom Dataform Airflow Operator to avoid having to define each time the variable of the compilation results. I created a util (UtilEnv) that based on the environment (production
or staging
) give some information (brand name, table prefix etc).:
from typing import Optional
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
)
from custom_airflow_modules.config import (
DATAFORM_PRODUCTION_REPOSITORY_ID,
DATAFORM_STAGING_REPOSITORY_ID,
)
from utils.util_env.util_env import UtilEnv
def get_repository_id(util_env: UtilEnv):
return (
DATAFORM_PRODUCTION_REPOSITORY_ID
if util_env.is_production()
else DATAFORM_STAGING_REPOSITORY_ID
)
class CustomDataformCreateCompilationResultOperator(
DataformCreateCompilationResultOperator
):
def __init__(
self,
*args,
task_id: str,
util_env: UtilEnv,
addition_vars_compilation_result: Optional[dict] = None,
**kwargs,
):
compilation_result = {
"git_commitish": util_env.to_branch_name(),
"code_compilation_config": {
"table_prefix": util_env.value,
"vars": {
"env": util_env.value,
},
},
}
if addition_vars_compilation_result is not None:
compilation_result["code_compilation_config"]["vars"].update(
addition_vars_compilation_result
)
super().__init__(
*args,
**kwargs,
task_id=task_id,
project_id=util_env.get_project_id(),
region=util_env.get_region(),
repository_id=get_repository_id(util_env),
compilation_result=compilation_result,
)