jdbcapache-kafkaapache-kafka-connectconfluent-platform

JDBC confluent connector mode


I am using a custom query in JDBC kafka source connector can any one told me what is the mode at the time of using custom query in JDBC kafka source connector if i am using bulk mode then it will reinsert all data in kafka topic. note:-i didn't have any primary key or timestamp column in my table.


Solution

  • You can use either incrementing or timestamp

    incrementing - use a strictly incrementing column on each table to detect only new rows. Note that this will not detect modifications or deletions of existing rows.

    timestamp - use a timestamp (or timestamp-like) column to detect new and modified rows. This assumes the column is updated with each write, and that values are monotonically incrementing, but not necessarily unique.

    timestamp+incrementing - use two columns, a timestamp column that detects new and modified rows and a strictly incrementing column which provides a globally unique ID for updates so each row can be assigned a unique stream offset.

    Example for timestamp :

    name=mysql-source-test
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=10
    
    connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
    table.whitelist=users,products
    
    mode=timestamp
    timestamp.column.name=last_modified
    
    topic.prefix=mysql-test-
    

    Example for incrementing :

    name=mysql-source-test
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=10
    
    connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
    table.whitelist=users,products
    
    mode=incrementing
    incrementing.column.name=id
    
    topic.prefix=mysql-test-
    

    Example for timestamp+incrementing :

    name=mysql-source-test
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=10
    
    connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
    table.whitelist=users,products
    
    mode=timestamp+incrementing
    incrementing.column.name=id
    timestamp.column.name=last_modified
    
    topic.prefix=mysql-test-