kubernetesamazon-s3apache-flink

Getting JAR file from S3 using Flink Kubernetes operator


I'm experimenting with the new Flink Kubernetes operator and I've been able to do pretty much everything that I need besides one thing: getting a JAR file from the S3 file system.

Context

I have a Flink application running in a EKS cluster in AWS and have all the information saved in a S3 buckets. Things like savepoints, checkpoints, high availability and JARs files are all stored there.

I've been able to save the savepoints, checkpoints and high availability information in the bucket, but when trying to get the JAR file from the same bucket I get the error: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto.

I was able to get to this thread, but I wasn't able to get the resource fetcher to work correctly. Also the solution is not ideal and I was searching for a more direct approach.

Deployment files

Here's the files that I'm deploying in the cluster:

deployment.yml

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-deployment
spec:
  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: pod-template
    spec:
      containers:
        - name: flink-main-container
          env:
            - name: ENABLE_BUILT_IN_PLUGINS
              value: flink-s3-fs-presto-1.15.3.jar;flink-s3-fs-hadoop-1.15.3.jar
          volumeMounts:
            - mountPath: /flink-data
              name: flink-volume
      volumes:
        - name: flink-volume
          hostPath:
            path: /tmp
            type: Directory
  image: flink:1.15
  flinkVersion: v1_15
  flinkConfiguration:
    state.checkpoints.dir: s3://kubernetes-operator/checkpoints
    state.savepoints.dir: s3://kubernetes-operator/savepoints
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3://kubernetes-operator/ha
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  serviceAccount: flink

session-job.yml

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: flink-session-job
spec:
  deploymentName: flink-deployment
  job:
    jarURI: s3://kubernetes-operator/savepoints/flink.jar
    parallelism: 3
    upgradeMode: savepoint
    savepointTriggerNonce: 0

The Flink Kubernetes operator version that I'm using is 1.3.1

Is there anything that I'm missing or doing wrong?


Solution

  • The download of the jar happens in flink-kubernetes-operator pod. So, when you apply FlinkSessionJob, the fink-operator would recognize the Crd and will try to download the jar from jarUri location and construct a JobGraph and submit the sessionJob to JobDeployment. Flink Kubernetes Operator will also have flink running inside it to build a JobGraph. So, You will have to add flink-s3-fs-hadoop-1.15.3.jar in location /opt/flink/plugins/s3-fs-hadoop/ inside flink-kubernetes-operator

    You can add the jar either by extending the ghcr.io/apache/flink-kubernetes-operator image, curl the jar and copy it to plugins location

    or

    You can write an initContainer which will download the jar to a volume and mount that volume

        volumes:
        - name: s3-plugin
          emptyDir: { }
        initContainers:
        - name: busybox
          image: busybox:latest
          volumeMounts:
          - mountPath: /opt/flink/plugins/s3-fs-hadoop
            name: s3-plugin
        containers:
        - image: 'ghcr.io/apache/flink-kubernetes-operator:95128bf'
          name: flink-kubernetes-operator
          volumeMounts:
          - mountPath: /opt/flink/plugins/s3-fs-hadoop
            name: s3-plugin
    

    Also, if you are using serviceAccount for S3 authentication, give below config in flinkConfig

    fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider