I am getting the following error when I run kafka JDBC connector to PSQL:
JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
However my topic contains the following message structure with a schema added just like its presented online:
rowtime: 2022/02/04 12:45:48.520 Z, key: , value: "{"schema": {"type": "struct", "fields": [{"type": "int", "field": "ID", "optional": false}, {"type": "date", "field": "Date", "optional": false}, {"type": "varchar", "field": "ICD", "optional": false}, {"type": "int", "field": "CPT", "optional": false}, {"type": "double", "field": "Cost", "optional": false}], "optional": false, "name": "test"}, "payload": {"ID": "24427934", "Date": "2019-05-22", "ICD": "883.436", "CPT": "60502", "cost": "1374.36"}}", partition: 0
My configuration for the connector is:
curl -X PUT http://localhost:8083/connectors/claim_test/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:postgresql://localhost:5432/ae2772",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",
"topics":"test_7",
"auto.create":"true",
"insert.mode":"insert"
}'
After some changes, I now get the following message:
WorkerSinkTask{id=claim_test} Error converting message value in topic 'test_9' partition 0 at offset 0 and timestamp 1644005137197: Unknown schema type: int
int
is not a valid schema type. Should be int8
, int16
, int32
, or int64
.
Similarly, date
, varchar
and double
are not valid either.
The types used in the JSON are not the same as Postgres or any SQL-specific types (a date should be converted to a Unix Epoch int64
time or be made a string
).
You can find the supported schema types here: https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java