amazon-s3apache-flinkflink-streamingpyflinkapache-hudi

java.io.IOException: No FileSystem for scheme: s3 in Flink


I run a Flink in Docker on my local env. And I try to write a Flink job to use CDC to sync Mysql data to S3 (stored as Apache Hudi format). My Flink job looks like:

t_env = StreamTableEnvironment.create(env, environment_settings=settings)
t_env.execute_sql(f"""
            CREATE TABLE source_table (
                xxx
            ) WITH (
                'connector' = 'mysql-cdc',
                'hostname' = 'xxx',
                'port' = 'xxx',
                'username' = 'xxx',
                'password' = 'xxx',
                'database-name' = 'xxx',
                'table-name' = 'xxx',
                'scan.incremental.snapshot.enabled' = 'true',
                'scan.incremental.snapshot.chunk.key-column' = 'id'
            )
        """)
t_env.execute_sql(f"""
            CREATE TABLE hudi_table (
                xxx,
                PRIMARY KEY (id) NOT ENFORCED
            ) WITH (
                'connector' = 'hudi',
                'path' = 's3://path/to/hudi',
                'table.type' = 'COPY_ON_WRITE',
                'write.precombine.field' = 'id',
                'write.operation' = 'upsert',
                'hoodie.datasource.write.recordkey.field' = 'id',
                'hoodie.datasource.write.partitionpath.field' = '',
                'write.tasks' = '1',
                'compaction.tasks' = '1',
                'compaction.async.enabled' = 'true'
            )
        """)
t_env.execute_sql(f"""
            INSERT INTO hudi_table
            SELECT * FROM source_table
        """)

But when I submit this job to Flink, it raise error:

 org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hudi_table'.
...
Caused by: java.io.IOException: No FileSystem for scheme: s3
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2799)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2810)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
    at org.apache.hudi.hadoop.fs.HadoopFSUtils.getFs(HadoopFSUtils.java:126)
    ... 42 more

Seems my Flink has no ability to visit S3. So which jars should I add to Flink in order to visit S3?


Solution

  • No FileSystem for scheme: s3 error indicates that S3 schema from S3 filesystem plugins wasn't installed. There seems to compatibility issues with hudi and flink-s3-fs-presto, flink-s3-fs-hadoop might be the way to go