I was trying to connect to Postgres (Postgis) and retrieve data from tables which have UUID and Geography columns:
CREATE TABLE area_of_interest (
id uuid NOT NULL,
geometry public.geography(multipolygon, 4326) NULL
CONSTRAINT "id_pk" PRIMARY KEY (id)
);
I saw already the question Flink JDBC UUID – source connector and that solution does not work with Flink 1.18.
I tried to register the table by using the following approaches but they fail:
TableDescriptor
.forConnector("jdbc")
.option(...)
...
.schema(
Schema.newBuilder()
.column("id", DataTypes.STRING().notNull()) // doesn't work -> java.lang.ClassCastException: class java.util.UUID cannot be cast to class java.lang.String (java.util.UUID and java.lang.String are in module java.base of loader 'bootstrap')
// .column("id", DataTypes.RAW(UUID.class).notNull()) // doesn't work -> Caused by: org.apache.flink.table.api.ValidationException: The PostgreSQL dialect doesn't support type: RAW('java.util.UUID', '...') NOT NULL.
.column("geometry", DataTypes.STRING().notNull()) // doesn't work -> java.lang.ClassCastException: class org.postgresql.util.PGobject cannot be cast to class java.lang.String (org.postgresql.util.PGobject is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
// .column("geometry", DataTypes.RAW(Geometry.class)) // doesn't work -> Caused by: org.apache.flink.table.api.ValidationException: The PostgreSQL dialect doesn't support type: RAW('org.locationtech.jts.geom.Geometry', '...')
// .columnByExpression("geometry", "cast(geometry as varchar)") // doesn't work -> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'geometry'
// .columnByExpression("geometry", "ST_asEWKT(geometry)") // doesn't work -> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'geometry'
.primaryKey("id")
.build())
.build()
Error when trying to map the java.util.UUID (Postgres column is uuid
) column to java.lang.String:
java.lang.ClassCastException: class java.util.UUID cannot be cast to class java.lang.String (java.util.UUID and java.lang.String are in module java.base of loader 'bootstrap')
at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$createInternalConverter$224afae6$10(AbstractJdbcRowConverter.java:176) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:78) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
Error when trying to map the Geography (Postgres column is geography(multipolygon, 4326)
) column to java.lang.String:
java.lang.ClassCastException: class org.postgresql.util.PGobject cannot be cast to class java.lang.String (org.postgresql.util.PGobject is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$createInternalConverter$224afae6$10(AbstractJdbcRowConverter.java:176) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
at org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:78) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
at org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56) ~[flink-connector-jdbc-3.1.2-1.18.jar:3.1.2-1.18]
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:97) ~[flink-streaming-java-1.18.1.jar:1.18.1]
I'd like to ask what is the supposed way to map those types from Postgis to Flink Table & SQL API or other approach that would help me connect to Postgis and retrieve column with those types.
I tried to map those columns using different approaches with no success.
To overcome this issue, I've created a view in Postgres:
create or replace
view vw_area_of_interest as
select
id::text as id,
st_asewkb(cast(geometry as geometry)) as geometry
from
area_of_interest
where
geometry is not null;
And used the schema like this:
Schema.newBuilder()
.column("id", DataTypes.STRING().notNull())
.column("geometry", DataTypes.BYTES().notNull())
.primaryKey("id")
.build())