I'm experimenting with the new Flink Kubernetes operator and I've been able to do pretty much everything that I need besides one thing: getting a JAR file from the S3 file system.
I have a Flink application running in a EKS cluster in AWS and have all the information saved in a S3 buckets. Things like savepoints, checkpoints, high availability and JARs files are all stored there.
I've been able to save the savepoints, checkpoints and high availability information in the bucket, but when trying to get the JAR file from the same bucket I get the error:
Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto.
I was able to get to this thread, but I wasn't able to get the resource fetcher to work correctly. Also the solution is not ideal and I was searching for a more direct approach.
Here's the files that I'm deploying in the cluster:
deployment.yml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-deployment
spec:
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
containers:
- name: flink-main-container
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-s3-fs-presto-1.15.3.jar;flink-s3-fs-hadoop-1.15.3.jar
volumeMounts:
- mountPath: /flink-data
name: flink-volume
volumes:
- name: flink-volume
hostPath:
path: /tmp
type: Directory
image: flink:1.15
flinkVersion: v1_15
flinkConfiguration:
state.checkpoints.dir: s3://kubernetes-operator/checkpoints
state.savepoints.dir: s3://kubernetes-operator/savepoints
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://kubernetes-operator/ha
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
serviceAccount: flink
session-job.yml
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: flink-session-job
spec:
deploymentName: flink-deployment
job:
jarURI: s3://kubernetes-operator/savepoints/flink.jar
parallelism: 3
upgradeMode: savepoint
savepointTriggerNonce: 0
The Flink Kubernetes operator version that I'm using is 1.3.1
Is there anything that I'm missing or doing wrong?
The download of the jar happens in flink-kubernetes-operator pod. So, when you apply FlinkSessionJob, the fink-operator would recognize the Crd and will try to download the jar from jarUri location and construct a JobGraph and submit the sessionJob to JobDeployment. Flink Kubernetes Operator will also have flink running inside it to build a JobGraph. So, You will have to add flink-s3-fs-hadoop-1.15.3.jar in location /opt/flink/plugins/s3-fs-hadoop/ inside flink-kubernetes-operator
You can add the jar either by extending the ghcr.io/apache/flink-kubernetes-operator image, curl the jar and copy it to plugins location
or
You can write an initContainer which will download the jar to a volume and mount that volume
volumes:
- name: s3-plugin
emptyDir: { }
initContainers:
- name: busybox
image: busybox:latest
volumeMounts:
- mountPath: /opt/flink/plugins/s3-fs-hadoop
name: s3-plugin
containers:
- image: 'ghcr.io/apache/flink-kubernetes-operator:95128bf'
name: flink-kubernetes-operator
volumeMounts:
- mountPath: /opt/flink/plugins/s3-fs-hadoop
name: s3-plugin
Also, if you are using serviceAccount for S3 authentication, give below config in flinkConfig
fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider