I'm using Flink Kubernetes Operator 1.3.0 and need to pass some environment variables to a Python job. I have followed the official documentation and the example runs fine. How can I inject environment variables so that I can use it inside the python file?
EDIT:
Here's the yaml file that I've used. Its straight from the example link above:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: python-example
spec:
image: localhost:32000/flink-python-example:1.16.0
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.0.jar # Note, this jarURI is actually a placeholder
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/python_demo.py"]
parallelism: 1
upgradeMode: stateless
As you can see it's a custom resource of kind FlinkDeployment. And here's the python code:
import logging
import sys
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
def python_demo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE TABLE orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
)""")
t_env.execute_sql("""
CREATE TABLE print_table WITH ('connector' = 'print')
LIKE orders""")
t_env.execute_sql("""
INSERT INTO print_table SELECT * FROM orders""")
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
python_demo()
Found the solution.
This is not detailed in the reference https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-0.1/docs/custom-resource/reference
or example Flink Deployment https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-0.1/docs/custom-resource/pod-template/
But here it says: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-0.1/docs/custom-resource/reference/#jobmanagerspec
JobManager pod template. It will be merged with FlinkDeploymentSpec.podTemplate
So I just added envFrom from the example in which shows you how to extend the FlinkDeployment CRD:
Confirmed this is working as I had to get this work for my own application now
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: python-example
spec:
image: localhost:32000/flink-python-example:1.16.0
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
serviceAccount: flink
containers:
# Do not change the main container name
- name: flink-main-container
envFrom:
- secretRef:
name: <SECRET RESOURCE NAME>
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.0.jar # Note, this jarURI is actually a placeholder
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/python_demo.py"]
parallelism: 1
upgradeMode: stateless