I run a Flink in Docker on my local env. And I try to write a Flink job to use CDC to sync Mysql data to S3 (stored as Apache Hudi format). My Flink job looks like:
t_env = StreamTableEnvironment.create(env, environment_settings=settings)
t_env.execute_sql(f"""
CREATE TABLE source_table (
xxx
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxx',
'port' = 'xxx',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxx',
'table-name' = 'xxx',
'scan.incremental.snapshot.enabled' = 'true',
'scan.incremental.snapshot.chunk.key-column' = 'id'
)
""")
t_env.execute_sql(f"""
CREATE TABLE hudi_table (
xxx,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 's3://path/to/hudi',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id',
'write.operation' = 'upsert',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.partitionpath.field' = '',
'write.tasks' = '1',
'compaction.tasks' = '1',
'compaction.async.enabled' = 'true'
)
""")
t_env.execute_sql(f"""
INSERT INTO hudi_table
SELECT * FROM source_table
""")
But when I submit this job to Flink, it raise error:
org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hudi_table'.
...
Caused by: java.io.IOException: No FileSystem for scheme: s3
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2799)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2810)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
at org.apache.hudi.hadoop.fs.HadoopFSUtils.getFs(HadoopFSUtils.java:126)
... 42 more
Seems my Flink has no ability to visit S3. So which jars should I add to Flink in order to visit S3?
No FileSystem for scheme: s3
error indicates that S3 schema from S3 filesystem plugins wasn't installed. There seems to compatibility issues with hudi and flink-s3-fs-presto
, flink-s3-fs-hadoop
might be the way to go