I am trying to connect to Oracle server using Kafka connect JDBC connector, here is my connector config:
CREATE SOURCE CONNECTOR hsr_source_connector2
WITH (
'connection.url' = 'jdbc:oracle:thin:@//HOSTIP:PORT/Service',
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.user' = '{User}',
'connection.password' = '{PASS}',
'mode' = 'bulk',
'query' = 'select * from <Tablename>',
'topic.prefix' = 'newuser_',
'characterEncoding'='UTF-8',
'transforms' = 'createKey,extractInt',
'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
'transforms.createKey.fields' = 'USER_ID',
'transforms.extractInt.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractInt.field' = 'USER_ID'
);
The topic gets created and it also reads the data, but the data comes up with junk character.
rowtime: 2024/04/18 17:08:51.028 Z, key: VN185082, value: ☺►VN185082►VN185082♠FSC☻☻W
rowtime: 2024/04/18 17:08:51.028 Z, key: MD250626, value: ☺►MD250626►MD250626♠SME☻☻R
Stream Definition:
CREATE STREAM hsr_users
(
user_id varchar,
user_name varchar,
pswd varchar,
user_role varchar,
access_level varchar
)WITH (kafka_topic='newuser_', value_format='JSON', partitions=1);
Can you suggest on how to apply encoding and get the right documentation for this. Referring to this document https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html
Thanks in advance!
Got the answer, this can be corrected by adding these properties:
CREATE SOURCE CONNECTOR hsr_source_connector3
WITH (
'connection.url' = 'jdbc:oracle:thin:@//HOSTIP:PORT/Service',
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.user' = '{User}',
'connection.password' = '{PASS}',
'mode' = 'bulk',
'query' = 'select * from <Tablename>',
'topic.prefix' = 'newEncode_',
'characterEncoding' = 'AL32UTF8', -- Updated: Added charset parameter
'transforms' = 'createKey,extractInt',
'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
'transforms.createKey.fields' = 'USER_ID',
'transforms.extractInt.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
'key.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'transforms.extractInt.field' = 'USER_ID',
'key.converter.schemas.enable'='false',
'value.converter.schemas.enable'='false',
'errors.tolerance' = 'all',
'errors.log.enable' = 'true',
'errors.log.include.messages' = 'true'
);
This will give data in this format:
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2024/04/19 11:08:19.755 Z, key: "HG185035", value: {"USER_ID":"HG185035","USER_NAME":"HG185035","PSWD":null,"USER_ROLE":"SME","ACCESS_LEVEL":"R"}
Basically, I was missing out the key.convertor and value.convertor properties. I got the right properties configured from the confluent documentation: https://www.confluent.io/en-gb/blog/kafka-connect-deep-dive-converters-serialization-explained/