I'm trying out PyFlink for streaming data from Kinesis into Hudi format, but can't figure out why it is not writing any data. I hope that maybe someone can provide any pointers.
Versions: Flink 1.15.4, Python 3.7, Hudi 0.13.0
I use streaming table environment
configuration = Configuration()
env_settings = (
EnvironmentSettings.new_instance()
.in_streaming_mode()
.with_configuration(configuration)
.build()
)
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
This is my schema
def get_test_table_schema() -> Schema:
return (Schema.new_builder()
.column("uuid", DataTypes.STRING().not_null())
.column("updated_at", DataTypes.TIMESTAMP().not_null())
.column("shard_key", DataTypes.STRING().not_null())
.primary_key("uuid")
.build())
This is how I define the output sink
table_env.create_table(
"test_table",
TableDescriptor.for_connector("hudi")
.schema(get_test_table_schema())
.option("path", "file://mypath/hudi/test.table")
.option("table.primaryKey", "uuid")
.option("table.preCombineField", "updated_at")
.partitioned_by("shard_key")
.build()
)
Manually inserting some data works and creates the files in the output path
result = table_env.execute_sql("""insert into test_table values
('1',TIMESTAMP '1970-01-01 00:00:01', 'par1'),
('2',TIMESTAMP '1970-01-01 00:00:02', 'par1'),
('3',TIMESTAMP '1970-01-01 00:00:03', 'par2');
""")
The job in Flink UI finishes and stream_write: test_table
tasks shows some data was written.
However, if I stream the records from a source such as Kinesis:
table_env.create_table("kinesis", get_kinesis_source())
input_table = table_env.from_path("kinesis")
input_table.execute_insert("test_table")
I can see the records are retrieved, but never written to Hudi
There are no exceptions, nothing seems broken. I must be missing something obvious.
Looks like I was not just pushing enough data to the pipeline.
I also reduced write.batch.size
from default 256MB to 1MB and started seeing the updates in the target path.
table_env.create_table(
"test_table",
TableDescriptor.for_connector("hudi")
.schema(get_test_table_schema())
.option("path", "file://mypath/hudi/test.table")
.option("table.primaryKey", "uuid")
.option("table.preCombineField", "updated_at")
.option("write.batch.size", "1")
.partitioned_by("shard_key")
.build()
)