I am trying to run dbt jobs via Cloud Composer. The big idea is to use the kubernetes pod operator to retrieve run dbt run.
The dag is below. I am familiar with workload identity, yet for some reason i can't seem to run my dbt workload because of a Runtime Error: "unable to generate access token".
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
with DAG(
'airflow_k8_dbt_demo',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['leopold.granger@datapilot.fr'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5)
},
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2022, 1, 1),
catchup=False,
tags=['GA4']
) as dag:
dbt_run = KubernetesPodOperator(
namespace="k8-executor3", # Some new namespace i created
service_account_name="composer3", # Some new kubernetes service account I created
config_file= "/home/airflow/composer_kube_config",
image="europe-west9-docker.pkg.dev/dataeng-sandbox-datapilot/composer-images-europe-west9-ga4k8podv4-4edb9f24-gke/airflow-k8-dbt-demo:1.0.1",
cmds=["bash", "-cx"],
arguments=["dbt run --project-dir dbt_k8_demo"],
labels={"foo": "bar"},
name="dbt-run-k8",
task_id="run_dbt_job_on_k8_demo",
image_pull_policy="Always",
get_logs=True,
dag=dag
)
dbt_run.dry_run()
I originally thought I was having issues with workload identity, but I followed the steps contained here to allow pods to authenticate to Google Cloud APIs using workload identity. When that didn't work, I changed the namespace to the dedicated "composer-user-workloads" namespace which apparently has access to Google Cloud Resources.
I have checked that my Composer environment has Workload Identity enabled.
This is the full error message:
*** Reading remote log from gs://europe-west9-ga4k8podv4-4edb9f24-bucket/logs/dag_id=airflow_k8_dbt_demo/run_id=manual__2023-12-11T18:08:33.155924+00:00/task_id=run_dbt_job_on_k8_demo/attempt=1.log.
[2023-12-11, 18:08:46 UTC] {taskinstance.py:1104} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: airflow_k8_dbt_demo.run_dbt_job_on_k8_demo manual__2023-12-11T18:08:33.155924+00:00 [queued]>
[2023-12-11, 18:08:46 UTC] {taskinstance.py:1104} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: airflow_k8_dbt_demo.run_dbt_job_on_k8_demo manual__2023-12-11T18:08:33.155924+00:00 [queued]>
[2023-12-11, 18:08:46 UTC] {taskinstance.py:1309} INFO - Starting attempt 1 of 1
[2023-12-11, 18:08:47 UTC] {taskinstance.py:1328} INFO - Executing <Task(KubernetesPodOperator): run_dbt_job_on_k8_demo> on 2023-12-11 18:08:33.155924+00:00
[2023-12-11, 18:08:47 UTC] {standard_task_runner.py:57} INFO - Started process 172376 to run task
[2023-12-11, 18:08:47 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'airflow_k8_dbt_demo', 'run_dbt_job_on_k8_demo', 'manual__2023-12-11T18:08:33.155924+00:00', '--job-id', '898', '--raw', '--subdir', 'DAGS_FOLDER/airflow_k8_dbt_demo.py', '--cfg-path', '/tmp/tmpg5fagstz']
[2023-12-11, 18:08:47 UTC] {standard_task_runner.py:85} INFO - Job 898: Subtask run_dbt_job_on_k8_demo
[2023-12-11, 18:08:47 UTC] {task_command.py:414} INFO - Running <TaskInstance: airflow_k8_dbt_demo.run_dbt_job_on_k8_demo manual__2023-12-11T18:08:33.155924+00:00 [running]> on host airflow-worker-r8zh5
[2023-12-11, 18:08:47 UTC] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='leopold.granger@datapilot.fr' AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='airflow_k8_dbt_demo' AIRFLOW_CTX_TASK_ID='run_dbt_job_on_k8_demo' AIRFLOW_CTX_EXECUTION_DATE='2023-12-11T18:08:33.155924+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-12-11T18:08:33.155924+00:00'
[2023-12-11, 18:08:47 UTC] {pod.py:973} INFO - Building pod dbt-run-k8-mms8vkif with labels: {'dag_id': 'airflow_k8_dbt_demo', 'task_id': 'run_dbt_job_on_k8_demo', 'run_id': 'manual__2023-12-11T180833.1559240000-b8aec9c3b', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2023-12-11, 18:08:48 UTC] {pod.py:548} INFO - Found matching pod dbt-run-k8-mms8vkif with labels {'airflow_kpo_in_cluster': 'False', 'airflow_version': '2.6.3-composer', 'dag_id': 'airflow_k8_dbt_demo', 'foo': 'bar', 'kubernetes_pod_operator': 'True', 'run_id': 'manual__2023-12-11T180833.1559240000-b8aec9c3b', 'task_id': 'run_dbt_job_on_k8_demo', 'try_number': '1'}
[2023-12-11, 18:08:48 UTC] {pod.py:549} INFO - `try_number` of task_instance: 1
[2023-12-11, 18:08:48 UTC] {pod.py:550} INFO - `try_number` of pod: 1
[2023-12-11, 18:08:48 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:08:49 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:08:50 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:08:51 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:08:52 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:08:53 UTC] {pod_manager.py:349} WARNING - Pod not yet started: dbt-run-k8-mms8vkif
[2023-12-11, 18:09:01 UTC] {pod_manager.py:431} INFO - [base] + dbt run --project-dir dbt_k8_demo
[2023-12-11, 18:09:02 UTC] {pod_manager.py:431} INFO - [base] 18:09:01 target not specified in profile 'dbt_k8_demo', using 'default'
[2023-12-11, 18:09:03 UTC] {pod_manager.py:431} INFO - [base] 18:09:02 Running with dbt=1.0.4
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base] 18:09:03 Partial parse save file not found. Starting full parse.
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base] 18:09:06 Found 2 models, 4 tests, 0 snapshots, 0 analyses, 188 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base] 18:09:06
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base] 18:09:06 Encountered an error:
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base] Runtime Error
[2023-12-11, 18:09:06 UTC] {pod_manager.py:431} INFO - [base] Unable to generate access token, if you're using impersonate_service_account, make sure your initial account has the "roles/iam.serviceAccountTokenCreator" role on the account you are trying to impersonate.
2023-12-11T18:09:06.569109820Z
[2023-12-11, 18:09:07 UTC] {pod_manager.py:444} INFO - [base] ("Failed to retrieve http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/composer3@dataeng-datapilot.iam.gserviceaccount.com/token?scopes=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fiam from the Google Compute Engine metadata service. Status: 404 Response:\nb'Unable to generate access token; IAM returned 404 Not Found: Not found; Gaia id not found for email composer3@dataeng-datapilot.iam.gserviceaccount.com\\n'", <google.auth.transport.requests._Response object at 0x7ff528226d00>)
[2023-12-11, 18:09:07 UTC] {pod_manager.py:468} WARNING - Follow requested but pod log read interrupted and container base still running
[2023-12-11, 18:09:08 UTC] {pod_manager.py:431} INFO - [base] 18:09:06 Found 2 models, 4 tests, 0 snapshots, 0 analyses, 188 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
[2023-12-11, 18:09:08 UTC] {pod_manager.py:431} INFO - [base] 18:09:06
[2023-12-11, 18:09:08 UTC] {pod_manager.py:431} INFO - [base] 18:09:06 Encountered an error:
[2023-12-11, 18:09:08 UTC] {pod_manager.py:431} INFO - [base] Runtime Error
[2023-12-11, 18:09:08 UTC] {pod_manager.py:431} INFO - [base] Unable to generate access token, if you're using impersonate_service_account, make sure your initial account has the "roles/iam.serviceAccountTokenCreator" role on the account you are trying to impersonate.
2023-12-11T18:09:06.569109820Z
[2023-12-11, 18:09:08 UTC] {pod_manager.py:444} INFO - [base] ("Failed to retrieve http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/composer3@dataeng-datapilot.iam.gserviceaccount.com/token?scopes=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fiam from the Google Compute Engine metadata service. Status: 404 Response:\nb'Unable to generate access token; IAM returned 404 Not Found: Not found; Gaia id not found for email composer3@dataeng-datapilot.iam.gserviceaccount.com\\n'", <google.auth.transport.requests._Response object at 0x7ff528226d00>)
[2023-12-11, 18:09:08 UTC] {pod_manager.py:571} INFO - Pod dbt-run-k8-mms8vkif has phase Running
[2023-12-11, 18:09:10 UTC] {pod.py:837} INFO - Deleting pod: dbt-run-k8-mms8vkif
[2023-12-11, 18:09:11 UTC] {taskinstance.py:1826} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 592, in execute
return self.execute_sync(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 632, in execute_sync
self.cleanup(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 765, in cleanup
raise AirflowException(
airflow.exceptions.AirflowException: Pod dbt-run-k8-mms8vkif returned a failure.
As often with this kind of GCP/GCC projects, the issue was with the workload identity. From this page https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity, I incorrectly annotated the kubernetes service account with the Google Service account, precisely, I did not use this command correctly
kubectl annotate serviceaccount KSA_NAME \
--namespace NAMESPACE \
iam.gke.io/gcp-service-account=GSA_NAME@GSA_PROJECT.iam.gserviceaccount.com
This led to an issue where the kubernetes service account did not have permission to perform my bigqueries transformations.