kubernetesapache-flinkpyflink

Passing environment variables to Flink job on Flink Kubernetes Cluster


I'm using Flink Kubernetes Operator 1.3.0 and need to pass some environment variables to a Python job. I have followed the official documentation and the example runs fine. How can I inject environment variables so that I can use it inside the python file?

EDIT:

Here's the yaml file that I've used. Its straight from the example link above:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: python-example
spec:
  image: localhost:32000/flink-python-example:1.16.0
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.0.jar # Note, this jarURI is actually a placeholder
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/python_demo.py"]
    parallelism: 1
    upgradeMode: stateless

As you can see it's a custom resource of kind FlinkDeployment. And here's the python code:

import logging
import sys

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment


def python_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    t_env.execute_sql("""
    CREATE TABLE orders (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    ) WITH (
      'connector' = 'datagen'
    )""")

    t_env.execute_sql("""
        CREATE TABLE print_table WITH ('connector' = 'print')
          LIKE orders""")
    t_env.execute_sql("""
        INSERT INTO print_table SELECT * FROM orders""")


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    python_demo()

Solution

  • Found the solution.

    This is not detailed in the reference https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-0.1/docs/custom-resource/reference

    or example Flink Deployment https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-0.1/docs/custom-resource/pod-template/

    But here it says: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-0.1/docs/custom-resource/reference/#jobmanagerspec JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate

    So I just added envFrom from the example in which shows you how to extend the FlinkDeployment CRD:

    https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-0.1/docs/custom-resource/pod-template/

    Confirmed this is working as I had to get this work for my own application now

    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: python-example
    spec:
      image: localhost:32000/flink-python-example:1.16.0
      flinkVersion: v1_16
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "1"
      serviceAccount: flink
      jobManager:
        resource:
          memory: "2048m"
          cpu: 1
        podTemplate:
          apiVersion: v1
          kind: Pod
          metadata:
           name: pod-template
          spec:
            serviceAccount: flink
            containers:
            # Do not change the main container name
              - name: flink-main-container
                envFrom:
                - secretRef:
                    name: <SECRET RESOURCE NAME>
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
      job:
        jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.0.jar # Note, this jarURI is actually a placeholder
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/python_demo.py"]
        parallelism: 1
        upgradeMode: stateless