Airflow's KubernetesPodOperator
provides an init_containers
parameter, with which you can specify kubernetes init_containers. However init_containers
expects a list of kubernetes.models.V1Container
, and I don't see any way to pass airflow context (or xcoms) to these containers. Is that possible?
Context: I want to use git-sync and kaniko to build an image from a private repo. Kaniko doesn't support cloning using a deploy key, so I want to use git-sync as an init-container to clone the repo for the kaniko pod. However, I need a way to pass the correct URL and key from airflow to the init container. I can imagine a solution that uses pod_mutation_hook
to extract context from the main pod, but that seems inelegant.
Here is what I would like to do. Note that my use of the kubernetes api may be inexact, however the issue I'm trying to solve is templating into the 'GIT_SYNC_REPO'
env variable.
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
cloner = k8s.V1Container(
name="clone-repo",
image="k8s.gcr.io/git-sync-amd64:v2.0.6",
env=[
# Problem: No templating occurs on this line.
k8s.V1EnvVar(name="GIT_SYNC_REPO", value="{{ dag_run.conf['repo_url'] }}"),
k8s.V1EnvVar(name="GIT_SYNC_SSH", value="true"),
k8s.V1EnvVar(name="GIT_SYNC_ONE_TIME", value="true"),
],
...
)
build_container = KubernetesPodOperator(
name="create_container_name",
task_id="create_container",
image="gcr.io/kaniko-project/executor:latest",
init_containers=[cloner],
image_pull_secrets=[ecr_image_pull_secret],
arguments=[
'--context=path/to/volume/containing/cloned/repo',
'--destination={{ ti.xcom_pull(task_ids="Setup")["sandbox_container_url"] }}',
"--cache=true",
],
secrets=[ecr_rw_credentials],
dag=dag,
service_account_name="airflow-worker",
)
Airflow does not apply templating to any string in a dag module, rather each Operator
defines a class level variable called template_fields
. Airflow only templates strings passed as arguments to the fields listed in template_fields
.
Therefore, to add templating to a field that isn't in template_fields
, you can subclass an existing operator:
import typing as tp
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
class BuildImage(KubernetesPodOperator):
template_fields: tp.Sequence[str] = ("repo_url",)
def __init__(self, repo_url: str, **kwargs):
cloner = k8s.V1Container(
name="clone-repo",
image="k8s.gcr.io/git-sync-amd64:v2.0.6",
env=[
k8s.V1EnvVar(name="GIT_SYNC_REPO", value=repo_url),
k8s.V1EnvVar(name="GIT_SYNC_SSH", value="true"),
k8s.V1EnvVar(name="GIT_SYNC_ONE_TIME", value="true"),
],
)
return super().__init__(self, init_containers=[cloner], **kwargs)
Then use it in a dag like so:
builder = BuildImage(repo_url="{{ dag_run.conf['repo_url'] }}")