apache-flinkriak-cs

Is it possible to use Riak CS with Apache Flink?


I want to configure filesystem state backend and zookeeper recovery mode:

state.backend: filesystem
state.backend.fs.checkpointdir: ???

recovery.mode: zookeeper
recovery.zookeeper.storageDir: ???

As you can see I should specify checkpointdir and storageDir parameters, but I don't have any file systems supported by Apache Flink (like HDFS or Amazon S3). But I have installed Riak CS cluster (seems like it compatible with S3).

So, can I use Riak CS together with Apache Flink? If it is possible: how to configure Apache Flink to work with Riak CS?


Solution

  • Answer: How to join Apache Flink and Riak CS?

    Riak CS has S3 (version 2) compatible interface. So, possible to use S3 file system adapter from Hadoop to work with Riak CS.

    I don't known why but Apache Flink has only part of Hadoop filesystem adapters inside fat jar (lib/flink-dist_2.11-1.0.1.jar) i.e. it has FTP file system (org.apache.hadoop.fs.ftp.FTPFileSystem) but doesn't have S3 file system (i.e. org.apache.hadoop.fs.s3a.S3AFileSystem). So, you have 2 ways to solve this problem:

    So, I choose second way because don't want to provision Hadoop in my environment. You can copy JARs from Hadoop dist or internet:

    curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.jar -o /flink/lib/hadoop-aws-2.7.2.jar
    curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar -o /flink/lib/aws-java-sdk-1.7.4.jar
    curl http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.2.5/httpcore-4.2.5.jar -o /flink/lib/httpcore-4.2.5.jar
    curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar -o /flink/lib/httpclient-4.2.5.jar
    

    As you can see I am using old versions because such version using in Hadoop 2.7.2 and I use Flink compatible with this version of Hadoop.

    FYI: Such hack can cause problems if you are using latest version of these JARs in own flow. To avoid problem related to different versions you can relocate packages when you are building fat jar with flow use something like (I am using Gradle):

    // Relocate org.apache.http packages because Apache Flink include old version of this library (we place them for using S3 compatible FS)
    shadowJar {
        dependencies {
            include(dependency('.*:.*:.*'))
        }
    
        relocate 'org.apache.http', 'relocated.org.apache.http'
        relocate 'org.apache.commons', 'relocated.org.apache.commons'
    }
    

    Then you should specify path to core-site.xml in flink-conf.yaml because Hadoop compatible file systems using this config for loading settings:

    ...
    fs.hdfs.hadoopconf: /flink/conf
    ...
    

    As you can see I just place it to <fink home>/conf directory. It has the following settings:

    <?xml version="1.0" encoding="UTF-8" ?>
    <configuration>
        <property>
            <name>fs.s3a.impl</name>
            <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> // because S3A better then other: https://wiki.apache.org/hadoop/AmazonS3
        </property>
        <property>
            <name>fs.s3a.endpoint</name>
            <value>my-riak-cs.stage.local</value>  // this is my Riak CS host
        </property>
        <property>
            <name>fs.s3a.connection.ssl.enabled</name> // my Riak CS in staging doesn't support SSL
            <value>false</value>
        </property>
        <property>
            <name>fs.s3a.access.key</name>
            <value>????</value> // this is my access key for Riak CS
        </property>
        <property>
            <name>fs.s3a.secret.key</name>
            <value>????</value> // this is my secret key for Riak CS
        </property>
    </configuration>
    

    Then you should configure Riak CS buckets in flink-conf.yaml as recommender here:

    ...
    state.backend.fs.checkpointdir: s3a://example-staging-flink/checkpoints
    ...
    recovery.zookeeper.storageDir: s3a://example-staging-flink/recovery
    ...
    

    and create buckets in Riak CS. I am using s3cmd (installed over brew in my OS X dev env):

    s3cmd mb s3://example-staging-flink
    

    FYI: Before using s3cmd you should configure it use s3cmd --configure and then fix some settings in ~/.s3cmd file:

    signature_v2 = True // because Riak CS using S3 V2 interface
    use_https = False // if your don't use SSL
    access_key = ???
    secret_key = ???
    host_base = my-riak-cs.stage.local // your Riak CS host
    host_bucket = %(bucket).my-riak-cs.stage.local // format of bucket used by Riak CS
    

    So, that's all what you should configure for save/restore state of Standalone HA Apache Flink cluster in Riak CS.