Following the question and suggestions here: Kafka JDBCSinkConnector Schema exception: JsonConverter with schemas.enable requires "schema" and "payload", I am trying to sink records into redshift using redshift connector and a producer written in python. Here is the connector config:
connector.class=io.confluent.connect.aws.redshift.RedshiftSinkConnector
aws.redshift.port=5439
confluent.topic.bootstrap.servers=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
tasks.max=2
topics=test_topic_3
aws.redshift.password=xxxxxxxxxxxxxxxxxxxxxxxxxxxx
aws.redshift.domain=xxxxxxxxxxxxxxxxxxxxxxxx.redshift.amazonaws.com
aws.redshift.database=xxxxxxxxxxxxxxxxxxx
confluent.topic.replication.factor=1
aws.redshift.user=xxxxxxxxxxxxxxxxxxxxxxxx
auto.create=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
pk.mode=kafka
The content in the schema file is as under:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"url"}],"optional":false,"name":"test_data"},"payload":{"id":12,"url":"some_url"}}
and the python code is:
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers="xxxxxxxxxxxxx",value_serializer=lambda v: json.dumps(v).encode('utf-8'))
with open("connector_test_schema.json", 'r') as file:
read = file.read()
for i in range(1):
producer.send("test_topic_3", key='abc'.encode('utf-8'), value=read)
producer.close()
I still get the following error:
[Worker-0fbc0b18922b147e0] org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Is there something that's wrong with what I'm doing?
As suggested by @OneCricketeer and confirmed, encoding was done twice, causing it to fail.
Solution: Only encode the string read from the JSON file
value_serializer=lambda v: v.encode('utf-8')