I want to configure filesystem
state backend and zookeeper
recovery mode:
state.backend: filesystem
state.backend.fs.checkpointdir: ???
recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???
As you can see I should specify checkpointdir
and storageDir
parameters, but I don't have any file systems supported by Apache Flink (like HDFS or Amazon S3). But I have installed Riak CS cluster (seems like it compatible with S3).
So, can I use Riak CS together with Apache Flink? If it is possible: how to configure Apache Flink to work with Riak CS?
Answer: How to join Apache Flink and Riak CS?
Riak CS has S3 (version 2) compatible interface. So, possible to use S3 file system adapter from Hadoop to work with Riak CS.
I don't known why but Apache Flink has only part of Hadoop filesystem adapters inside fat jar (lib/flink-dist_2.11-1.0.1.jar
) i.e. it has FTP file system (org.apache.hadoop.fs.ftp.FTPFileSystem
) but doesn't have S3 file system (i.e. org.apache.hadoop.fs.s3a.S3AFileSystem
). So, you have 2 ways to solve this problem:
<flink home>/lib
directorySo, I choose second way because don't want to provision Hadoop in my environment. You can copy JARs from Hadoop dist or internet:
curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar
As you can see I am using old versions because such version using in Hadoop 2.7.2 and I use Flink compatible with this version of Hadoop.
FYI: Such hack can cause problems if you are using latest version of these JARs in own flow. To avoid problem related to different versions you can relocate packages when you are building fat jar with flow use something like (I am using Gradle):
// Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
shadowJar {
dependencies {
include(dependency('.*:.*:.*'))
}
relocate 'org.apache.http', 'relocated.org.apache.http'
relocate 'org.apache.commons', 'relocated.org.apache.commons'
}
Then you should specify path to core-site.xml
in flink-conf.yaml
because Hadoop compatible file systems using this config for loading settings:
...
fs.hdfs.hadoopconf: /flink/conf
...
As you can see I just place it to <fink home>/conf
directory. It has the following settings:
<?xml version="1.0" encoding="UTF-8" ?>
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>my-riak-cs.stage.local</value> // this is my Riak CS host
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
<value>false</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>????</value> // this is my access key for Riak CS
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>????</value> // this is my secret key for Riak CS
</property>
</configuration>
Then you should configure Riak CS buckets in flink-conf.yaml
as recommender here:
...
state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
...
recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
...
and create buckets in Riak CS. I am using s3cmd
(installed over brew
in my OS X dev env):
s3cmd mb s3://example-staging-flink
FYI: Before using s3cmd
you should configure it use s3cmd --configure
and then fix some settings in ~/.s3cmd
file:
signature_v2 = True // because Riak CS using S3 V2 interface
use_https = False // if your don't use SSL
access_key = ???
secret_key = ???
host_base = my-riak-cs.stage.local // your Riak CS host
host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS
So, that's all what you should configure for save/restore state of Standalone HA Apache Flink cluster in Riak CS.