mongodbapache-kafkaconfluent-schema-registrymongodb-kafka-connector

kafka-connect with mongodb-Connector add "-value" to the topic name


I produce events to a kafka(confluentinc/cp-server:7.0.1) topic my-topic, I use a schema-registry(confluentinc/cp-schema-registry:7.1.0) to store my schemas(protobuf).

I use kafka-connect(confluentinc/cp-kafka-connect:7.0.1-1-ubi8) to connect to mongoDB

"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics":          "my-topic",
"key.converter.schemas.enable": "false",
"key.converter":                "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter":              "io.confluent.connect.protobuf.ProtobufConverter",
"connection.uri":               "mongodb://mongo:27017",
"database":                     "my-database",
"collection":                   "my-collection",
"tasksMax":                     "1",

The problem is that the connector is looking for the schema name my-topic-value when it connects the schema-registry, which it does not find of-course so the binaries can not be deserialized. The connector returns an error SerializationException error code 40401 and reports that my-topic-value can not be found.

So I published events with the topic my-topic-value (without changing the connector which still has "topics": "my-topic"), then it works but doing so is not viable. Any ideas on how to fix this ?

Thank you.


Solution

  • Topics have keys and values. Both can optionally have distinct schemas, therefore they individually map to subjects in the Registry, not the topic itself.

    It can be changed, but not to remove the suffix. https://developer.confluent.io/courses/schema-registry/schema-subjects/

    Changing the topic name isn't the correct solution. Instead, register the schema to the correct subject (my-topic), which any producer using Protobuf serializer should have done automatically (unless they also override the subject strategy)