apache-kafkagisgeodebezium

How can I deserialize geometry fields from Kafka messages stream via Debezium Connect?


I have a PostGIS + Debezium/Kafka + Debezium/Connect setup that is streaming changes from one database to another. I have been watching the messages via Kowl and everything is moving accordingly.

My problem relies when I'm reading the message from my Kafka Topic, the geometry (wkb) column in particular.

This is my Kafka message:

{
    "schema":{
        "type":"struct"
        "fields":[...]
        "optional":false
        "name":"ecotx_geometry_kafka.ecotx_geometry_impo..."
    }
    "payload":{
        "before":NULL
        "after":{
            "id":"d6ad5eb9-d1cb-4f91-949c-7cfb59fb07e2"
            "type":"MultiPolygon"
            "layer_id":"244458fa-e6e0-4c6c-a7e1-5bf0afce2fb8"
            "geometry":{
                "wkb":"AQYAACBqCAAAAQAAAAEDAAAAAQAAAAUAAABwQfUo..."
                "srid":2154
            }
        "custom_style":NULL
        "style_id":"default_layer_style"
    }
    "source":{...}
    "op":"c"
    "ts_ms":1618854994546
    "transaction":NULL
    }
}

As can be seem, the WKB information is something like "AQAAAAA...", despite the information inserted in my database being "01060000208A7A000000000000" or "LINESTRING(0 0,1 0)".

And I don't know how to parse/transform it to a ByteArray or a Geometry in my Consumer app (Kotlin/Java) to further use in GeoTools.

I don't know if I'm missing an import that is able to translate this information.

I'm have just a few questions around of people posting their json messages and every message that has a geom field (streamed w/ Debezium) got changed to this "AAAQQQAAAA".

Having said that, how can I parse/decoded/translate it to something that can be used by GeoTools?

Thanks.

@UPDATE

Additional info:

After an insert, when I analyze my slot changes (querying the database using pg_logical_slot_get_changes function), I'm able to see my changes in WKB:

{"change":[{"kind":"insert","schema":"ecotx_geometry_import","table":"geometry_data","columnnames":["id","type","layer_id","geometry","custom_style","style_id"],"columntypes":["uuid","character varying(255)","uuid","geometry","character varying","character varying"],"columnvalues":["469f5aed-a2ea-48ca-b7d2-fe6e54b27053","MultiPolygon","244458fa-e6e0-4c6c-a7e1-5bf0afce2fb8","01060000206A08000001000000010300000001000000050000007041F528CB332C413B509BE9710A594134371E05CC332C4111F40B87720A594147E56566CD332C4198DF5D7F720A594185EF3C8ACC332C41C03BEDE1710A59417041F528CB332C413B509BE9710A5941",null,"default_layer_style"]}]}

Which would be useful in the consumer app, the thing definitely relies on the Kafka Message content itself, just ain't sure who is transforming this value, if Kafka or DBZ/Connect.


Solution

  • I think it is just a different way to represent binary columns in PostGIS and in JSON. The WKB is a binary field, meaning it is has bytes with arbitrary values, many of which has no corresponding printable characters. PostGIS prints it out using HEX encoding, thus it looks like '01060000208A7A...' - hex digits, but internally it is just bytes. Kafka's JSON uses BASE64 encoding instead for exactly the same binary message.

    Let's test with a prefix of your string,

    select to_base64(from_hex('01060000206A080000010000000103000000010000000500'))
    
    AQYAACBqCAAAAQAAAAEDAAAAAQAAAAUA