Helo I'm receiving the following error, and I don't understand why. I believe it has to do with the service account, but the service account should have editor permission
Objective Run an R script using the BashOperator, through the use of a Docker image which has R installed
Error
{kubernetes_pod.py:872} ERROR - 'NoneType' object has no attribute 'metadata'
kubernetes.client.exceptions.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'd338b499-5979-4d14-9ceb-e17b91dd7577', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': 'e2983b55-6f40-4b09-a2f5-6b07f7b3cd46', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'a1a2f4b4-f381-49a9-9edb-781b6cb62cdf', 'Date': 'Thu, 02 Nov 2023 23:42:04 GMT', 'Content-Length': '314'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods is forbidden: User \"system:serviceaccount:composer-1-20-12-airflow-2-4-3-a5161aba:default\" cannot list resource \"pods\" in API group \"\" in the namespace \"default\"","reason":"Forbidden","details":{"kind":"pods"},"code":403}
Code
from airflow import models
#from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import os
# Get current directory
cwd = os.getcwd()
#sudo apt-get install r-base-core
default_dag_args = {
# Fecha inicial de corrida
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 10, 31),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=5),
}
dag = DAG(
'prueba-bash-gcs',
default_args=default_dag_args,
schedule_interval=None, # Intervalo de corrida de DAG (periodicidad de corrida)
)
kubernetes_min_pod = KubernetesPodOperator(
task_id="test-bash-r-k",
dag = dag,
name="test-bash-r",
cmds=["echo"],
namespace="default",
image="rpy2/base-ubuntu",
)
test_bash = BashOperator(
task_id='test-bash-r',
dag = dag,
#bash_command = f'Rscript <URI to script>/prueba-bash.r'
bash_command = 'Rscript -e \'a <- 3+3;cat(a, "\n")\''
)
kubernetes_min_pod >> test_bash
If you want to deploy workloads using the KubernetesPodOperator within Cloud Composer, you have to set the namespace to be composer-user-workloads
. This is due to how GCP has set up the Airflow cluster (they have removed the ability for Airflow to deploy into the default namespace).
More information can be found here. There is also a stackoverflow thread with this specific issue here that goes into more detail.