apache-kafkaapache-kafka-connectjdbc-postgres

Kafka connect JDBC error despite giving schema and payload


I am getting the following error when I run kafka JDBC connector to PSQL:

JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

However my topic contains the following message structure with a schema added just like its presented online:

rowtime: 2022/02/04 12:45:48.520 Z, key: , value: "{"schema": {"type": "struct", "fields": [{"type": "int", "field": "ID", "optional": false}, {"type": "date", "field": "Date", "optional": false}, {"type": "varchar", "field": "ICD", "optional": false}, {"type": "int", "field": "CPT", "optional": false}, {"type": "double", "field": "Cost", "optional": false}], "optional": false, "name": "test"}, "payload": {"ID": "24427934", "Date": "2019-05-22", "ICD": "883.436", "CPT": "60502", "cost": "1374.36"}}", partition: 0

My configuration for the connector is:

 curl -X PUT http://localhost:8083/connectors/claim_test/config \
    -H "Content-Type: application/json" \
    -d '{
     "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
     "connection.url":"jdbc:postgresql://localhost:5432/ae2772",
     "key.converter":"org.apache.kafka.connect.json.JsonConverter",
     "value.converter":"org.apache.kafka.connect.json.JsonConverter",
     "value.converter.schemas.enable":"true",
     "topics":"test_7",
     "auto.create":"true",
     "insert.mode":"insert"
    }'

After some changes, I now get the following message:

WorkerSinkTask{id=claim_test} Error converting message value in topic 'test_9' partition 0 at offset 0 and timestamp 1644005137197: Unknown schema type: int


Solution

  • int is not a valid schema type. Should be int8, int16, int32, or int64.

    Similarly, date, varchar and double are not valid either.

    The types used in the JSON are not the same as Postgres or any SQL-specific types (a date should be converted to a Unix Epoch int64 time or be made a string).

    You can find the supported schema types here: https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java