I'm trying to use Flink-cdc to capture data change from Mysql and update the Hudi table in S3. My pyFlink job was like:
env = StreamExecutionEnvironment.get_execution_environment(config)
env.set_parallelism(1)
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)
t_env.execute_sql(f"""
CREATE TABLE source (
...
) WITH (
'connector' = 'mysql-cdc',
...
)
""")
t_env.execute_sql(f"""
CREATE TABLE target (
...
) WITH (
'connector' = 'hudi',
'path' = 's3a://xxx/xx/xx',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'id',
'write.operation' = 'upsert',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.partitionpath.field' = '',
'write.tasks' = '1',
'compaction.tasks' = '1',
'compaction.async.enabled' = 'true',
'hoodie.table.version' = 'SIX',
'hoodie.write.table.version' = '6'
)
""")
t_env.execute_sql(f"""
INSERT INTO target
SELECT * FROM source
""")
But when I try to submit this job to Flink, Flink return an error:
org.apache.hudi.exception.HoodieLockException: Unsupported scheme :s3a, since this fs can not support atomic creation
What does this error mean? Does it mean S3 is not support to be the sink in this situation? Flink could not upsert data into S3?
Resolve by setting 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.InProcessLockProvider'