apache-flinkpostgisflink-sqlapache-sedona

Sedona + Flink JDBC and Postgis data types (UUID, Geography)


I was trying to connect to Postgres (Postgis) and retrieve data from tables which have UUID and Geography columns:

CREATE TABLE area_of_interest (
    id uuid NOT NULL,
    geometry public.geography(multipolygon, 4326) NULL
    CONSTRAINT "id_pk" PRIMARY KEY (id)
);

I saw already the question Flink JDBC UUID – source connector and that solution does not work with Flink 1.18.

I tried to register the table by using the following approaches but they fail:

TableDescriptor
    .forConnector("jdbc")
    .option(...)
    ...
    .schema(
        Schema.newBuilder()
            .column("id", DataTypes.STRING().notNull()) // doesn't work -> java.lang.ClassCastException: class java.util.UUID cannot be cast to class java.lang.String (java.util.UUID and java.lang.String are in module java.base of loader 'bootstrap')
            // .column("id", DataTypes.RAW(UUID.class).notNull())  // doesn't work -> Caused by: org.apache.flink.table.api.ValidationException: The PostgreSQL dialect doesn't support type: RAW('java.util.UUID', '...') NOT NULL.
            .column("geometry", DataTypes.STRING().notNull()) // doesn't work -> java.lang.ClassCastException: class org.postgresql.util.PGobject cannot be cast to class java.lang.String (org.postgresql.util.PGobject is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
            // .column("geometry", DataTypes.RAW(Geometry.class))  // doesn't work -> Caused by: org.apache.flink.table.api.ValidationException: The PostgreSQL dialect doesn't support type: RAW('org.locationtech.jts.geom.Geometry', '...')
            // .columnByExpression("geometry", "cast(geometry as varchar)") // doesn't work -> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'geometry'
            // .columnByExpression("geometry", "ST_asEWKT(geometry)") // doesn't work -> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'geometry'
            .primaryKey("id")
            .build())
        .build()

Error when trying to map the java.util.UUID (Postgres column is uuid) column to java.lang.String:

java.lang.ClassCastException: class java.util.UUID cannot be cast to class java.lang.String (java.util.UUID and java.lang.String are in module java.base of loader 'bootstrap')
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$createInternalConverter$224afae6$10(AbstractJdbcRowConverter.java:176) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:78) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]

Error when trying to map the Geography (Postgres column is geography(multipolygon, 4326)) column to java.lang.String:

java.lang.ClassCastException: class org.postgresql.util.PGobject cannot be cast to class java.lang.String (org.postgresql.util.PGobject is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$createInternalConverter$224afae6$10(AbstractJdbcRowConverter.java:176) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:78) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:97) ~[flink-streaming-java-1.18.1.jar:1.18.1]

I'd like to ask what is the supposed way to map those types from Postgis to Flink Table & SQL API or other approach that would help me connect to Postgis and retrieve column with those types.

I tried to map those columns using different approaches with no success.


Solution

  • To overcome this issue, I've created a view in Postgres:

    create or replace
    view vw_area_of_interest as
    select
        id::text as id,
        st_asewkb(cast(geometry as geometry)) as geometry
    from
        area_of_interest
    where
        geometry is not null;
    

    And used the schema like this:

    Schema.newBuilder()
        .column("id", DataTypes.STRING().notNull())
        .column("geometry", DataTypes.BYTES().notNull())
        .primaryKey("id")
        .build())