jdbcapache-kafkaapache-kafka-connect

query-based JDBC Source connector Kafka


I have a legacy data base that has a primary key column to be string ( yeah I know ). I want to do an increment dumping mode from the postgres DB into kafka topics using JDBC kafka Source Connector

Below is my attempt to recreate the problem

create table test(
id varchar(20) primary key,
name varchar(10) 
);

INSERT INTO test(
    id, name)
VALUES ('1ab', 't'),
('2ab', 't'),
('3ab', 't')

My config

{"name" : "test_connector",
    "config" : {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://12.34.5.6:5432/",
        "connection.user": "user",
        "connection.password": "password",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topic.prefix": "incre_",
        "mode": "incrementing",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "query" :"SELECT cast(replace(id, 'ab','') as integer) as id , name from test ORDER BY id ASC",
        "incrementing.column.name":"id",
        "value.converter.schema.registry.url": "http://schema-registry_url.com",
        "key.converter.schema.registry.url": "http://schema-registry_url.com",
        "offset.flush.timeout.ms": 2000,

    }
}

After I posted the config , the status was RUNNING when I did a HTTP curl . There is also no error log in the worker's log when I checked it There is also no data in the kafka topic when I tried to do a console-consumer I also tried several other combination like adding in "table.whitelist": "test".

Another thing i tried was following these two links https://rmoff.net/2018/05/21/kafka-connect-and-oracle-data-types/ https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector but none help , even the smart trick that was suggested like SELECT * from (SELECT id, name from test where ...)


Solution

  • So after a few hours playing with different configuration. I come back to the official document and realised this

    Use a custom query instead of loading tables, allowing you to join data from multiple tables. As long as the query does not include its own filtering, you can still use the built-in modes for incremental queries (in this case, using a timestamp column). Note that this limits you to a single output per connector and because there is no table name, the topic “prefix” is actually the full topic name in this case.

    So the key is that "topic.prefix": "incre_test"

    Follow up on the previous setting, the proper config should be

    {"name" : "test_connector",
        "config" : {
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:postgresql://12.34.5.6:5432/",
            "connection.user": "user",
            "connection.password": "password",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "topic.prefix": "incre_test",
            "mode": "incrementing",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "query" :"SELECT cast(replace(id, 'ab','') as integer) as id , name from test ORDER BY id ASC",
            "incrementing.column.name":"id",
            "value.converter.schema.registry.url": "http://schema-registry_url.com",
            "key.converter.schema.registry.url": "http://schema-registry_url.com",
            "offset.flush.timeout.ms": 2000,
    
        }
    }