I'm trying to load data into a ksqlDB source table by using the JDBC Source connector in Kafka Connect. The topic key is a JSON object created from two of the columns. ksqlDB can't deserialize the key and throughs an error. Is it possible to create a source table in ksqlDB with two columns as primary keys that are deserialized from the JSON key?
The connector config is:
CREATE SOURCE CONNECTOR IF NOT EXISTS loadshedding_model_source WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = 'jdbc:postgresql://cockroachdb:26257/db',
'connection.user' = 'user',
'connection.password' = 'password',
'connection.options' = '-c multiple_active_portals_enabled=true',
'topic.prefix' = 'jdbc_',
'table.whitelist' = 'whitelist_table',
'mode' = 'bulk',
'numeric.mapping' = 'best_fit',
"poll.interval.ms" = 1800000,
'transforms' = 'createKey',
'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
'transforms.createKey.fields' = 'c1,c2',
'topic.creation.default.partitions' = 3,
'topic.creation.default.replication.factor' = 3
);
I create the Kafka key from two columns 'c1' and 'c2' (see transforms config). This works fine and I can see the messages in the topic has a JSON key in the for
{
"c1": "c1v1",
"c2": "c2": "v1"
}
I'm trying to create a SOURCE table in ksqDB with the PRIMARY KEYs as c1 and c2 wit:
CREATE SOURCE TABLE whitelist_table (
C1 VARCHAR PRIMARY KEY,
C2 VARCHAR PRIMARY KEY,
C3 VARCHAR,
C4 TIMESTAMP,
C5 INT
) WITH (
KAFKA_TOPIC='jdbc_whitelist_table',
KEY_FORMAT='JSON',
VALUE_FORMAT='JSON'
)
ksqlDB creates the table but can't deserialize the key. I get the following error message:
ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Failed to deserialize key from topic: jdbc_model_load_shedding. Unrecognized token 'Struct': was expecting ( JSON String, Number, Array, Object or token 'null', 'true' or 'false')","recordB64":null,"cause":["Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"],"topic":"jdbc_whitelist_table"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.CST_WHITELIST_TABLE_11.KsqlTopic.Source.deserializer:44)
If you see error mentioning Struct
token/string, then you are using StringConverter
, which will not work for JSON data. You need to set JsonConverter
for the key/value converter settings in the connector config