apache-kafkaapache-kafka-connectksqldb

ksqlDB fails to deserialize JSON key


I'm trying to load data into a ksqlDB source table by using the JDBC Source connector in Kafka Connect. The topic key is a JSON object created from two of the columns. ksqlDB can't deserialize the key and throughs an error. Is it possible to create a source table in ksqlDB with two columns as primary keys that are deserialized from the JSON key?

The connector config is:

CREATE SOURCE CONNECTOR IF NOT EXISTS loadshedding_model_source WITH (
  'connector.class'             = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.url'              = 'jdbc:postgresql://cockroachdb:26257/db',
  'connection.user'             = 'user',
  'connection.password'         = 'password',
  'connection.options'          = '-c multiple_active_portals_enabled=true',
  'topic.prefix'                = 'jdbc_',
  'table.whitelist'             = 'whitelist_table',
  'mode'                        = 'bulk',
  'numeric.mapping'             = 'best_fit',
  "poll.interval.ms"            = 1800000,
  'transforms'                  = 'createKey',
  'transforms.createKey.type'   = 'org.apache.kafka.connect.transforms.ValueToKey',
  'transforms.createKey.fields' = 'c1,c2',
  'topic.creation.default.partitions' = 3,
  'topic.creation.default.replication.factor' = 3
);

I create the Kafka key from two columns 'c1' and 'c2' (see transforms config). This works fine and I can see the messages in the topic has a JSON key in the for

{
  "c1": "c1v1",
  "c2": "c2": "v1"
}

I'm trying to create a SOURCE table in ksqDB with the PRIMARY KEYs as c1 and c2 wit:

CREATE SOURCE TABLE whitelist_table (
  C1 VARCHAR PRIMARY KEY,
  C2 VARCHAR PRIMARY KEY,
  C3 VARCHAR,
  C4 TIMESTAMP,
  C5 INT
) WITH (
  KAFKA_TOPIC='jdbc_whitelist_table',
  KEY_FORMAT='JSON',
  VALUE_FORMAT='JSON'
)

ksqlDB creates the table but can't deserialize the key. I get the following error message:

ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Failed to deserialize key from topic: jdbc_model_load_shedding. Unrecognized token 'Struct': was expecting ( JSON String, Number, Array, Object or token 'null', 'true' or 'false')","recordB64":null,"cause":["Unrecognized token 'Struct': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')"],"topic":"jdbc_whitelist_table"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.CST_WHITELIST_TABLE_11.KsqlTopic.Source.deserializer:44)

Solution

  • If you see error mentioning Struct token/string, then you are using StringConverter, which will not work for JSON data. You need to set JsonConverter for the key/value converter settings in the connector config