FsStateBackend
. But somehow I was getting the following error.Error
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
Flink version: I am using Flink 1.10.0 version.
I have found the solution for the above issue, so here I am listing it in steps that are required.
flink-conf.yaml
file which I have listed below.state.backend: filesystem
state.checkpoints.dir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
state.backend.fs.checkpointdir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
s3.access-key: XXXXXXXXXXXXXXXXXXX #your-access-key
s3.secret-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx #your-secret-key
s3.endpoint: http://127.0.0.1:9000 #your-endpoint-hostname (I have used Minio)
After completing the first step we need to copy the respective(flink-s3-fs-hadoop-1.10.0.jar
and flink-s3-fs-presto-1.10.0.jar
) JAR files from the opt directory to the plugins directory of your Flink.
/flink-1.10.0/opt/flink-s3-fs-hadoop-1.10.0.jar
to /flink-1.10.0/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.10.0.jar
// Recommended for StreamingFileSink/flink-1.10.0/opt/flink-s3-fs-presto-1.10.0.jar
to /flink-1.10.0/plugins/s3-fs-presto/flink-s3-fs-presto-1.10.0.jar
//Recommended for checkpointingAdd this in checkpointing code
env.setStateBackend(new FsStateBackend("s3://s3-bucket/checkpoints/"))
Note:
flink-s3-fs-hadoop
and flink-s3-fs-presto
) in Flink then please use s3p://
specificly for flink-s3-fs-presto
and s3a://
for flink-s3-fs-hadoop
instead of s3://
.