apache-kafkakafka-rest

Registering Schema ID with Topic using confluent_kafka for python


The only answer I have gotten so far, is that you have to give the schema and the topic the same name, and then this should link them together. But after registering a schema with name test_topic like:

{
  "type": "record",
  "name": "test_topic",
  "namespace": "com.test",
  "doc": "My test schema",
  "fields": [
    {
      "name": "name",
      "type": "string"
    }
  ]
}

and running the following command, it inserts without a problem.

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"name": "My first name"}}]}' "http://localhost/topics/test_topic"

But when I run the following command as well it inserts without giving any error (note,I changed the property name)

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"test": "My first name"}}]}' "http://localhost/topics/test_topic"

I would have suspected an error message saying that my data does not match the schema for this topic...

My schema ID is 10, so I know it is working and registered, but not very useful at the moment.

Python Code:

from confluent_kafka import Producer
import socket
import json
conf = {'bootstrap.servers': 'localhost:9092', 'client.id': socket.gethostname()}
producer = Producer(conf)

def acked(err, msg):
    if err is not None:
        print(f'Failed to deliver message: {str(msg)}, {str(err)}')
    else:
        print(f'Message produced: {str(msg)}')

    producer.produce("test_topic", key="key", value=json.dumps({"test": name}).encode('ascii') , callback=acked)

producer.poll(5)

Solution

  • you have to give the schema and the topic the same name, and then this should link them together

    That's not quite how the Schema Registry works.

    Each kafka record has a key and a value.

    The Registry has subjects, which are not strictly mapped to topics.

    However, the Kafka clients (de)serializer implementation will use both topic-key and topic-value subject names to register/extract schemas from the registry.

    Clients cannot tell the registry what ID to put the schema at. That logic is calculated server side


    I'm not sure I understand what your post has to do with the REST Proxy, but you're posting plain JSON and not telling it that the data should be Avro (you're using the incorrect header)

    If Avro is used, the content type will be application/vnd.kafka.avro.v2+json