apache-flinkapache-icebergdata-lakehouse

How to create an Iceberg table using Flink that is partitioned on STRUCT/ROW type's inner field?


I try to create the table like so:

Flink SQL> create table test (nested ROW(id STRING, name STRING)) partitioned by (nested.id);

But I get this error:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 1, column 78.
Was expecting one of:
    ")" ...
    "," ...

I've tried a bunch of options: partitioned by ('nested.id'), partitioned by (nested.id) but they all fail. There's no mention of this in the docs either.


Solution

  • There's close to no documentation on this, but I figured it out:

    Schema schema = new Schema(
            optional(1, "id", Types.IntegerType.get()),
            optional(2, "name", Types.StringType.get()),
            optional(3, "address", Types.StructType.of(
                    optional(4, "street", Types.StringType.get()),
                    optional(5, "city", Types.StringType.get()),
                    optional(6, "state", Types.StringType.get()),
                    optional(7, "zip", Types.IntegerType.get())
            ))
    );
    
    Table table = new TableBuilder()
            .withSchema(schema)
            .withPartitionSpec(PartitionSpec.builderFor(schema)
                    .identity("id")
                    .bucket("address.city", 16)
                    .build())
            .build();