apache-kafkaavroapache-kafka-connectconfluent-schema-registryconfluent-control-center

Produce Avro messages in Confluent Control Center UI


To develop a data transfer application I need first define a key/value avro schemas. The producer application is not developed yet till define the avro schema.

I cloned a topic and its key/value avro schemas that are already working and and also cloned the the jdbc snink connector. Simply I just changed the topic and connector names.

Then I copied and existing message successfully sent sink using Confluent Topic Message UI Producer. enter image description here

But it is sending the error: "Unknown magic byte!"

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:323)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:164)
        at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
        ... 17 more
[2022-07-25 03:45:42,385] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)

Reading other questions it seems the message has to be serialized using the schema.

Unknown magic byte with kafka-avro-console-consumer

is it possible to send a message to a topic with AVRO key/value schemas using the Confluent Topic UI?

Any idea if the avro schemas need information depending on the connector/source? or if namespace depends on the topic name?

This is my key schema. And the topic's name is knov_03

{
  "connect.name": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming.Key",
  "fields": [
    {
      "name": "id_sap_incoming",
      "type": "long"
    }
  ],
  "name": "Key",
  "namespace": "dbserv1.MY_DB_SCHEMA.ps_sap_incoming",
  "type": "record"
}

enter image description here

Connector:

{
  "name": "knov_05",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "knov_03",
    "connection.url": "jdbc:mysql://eXXXXX:3306/MY_DB_SCHEMA?useSSL=FALSE&nullCatalogMeansCurrent=true",
    "connection.user": "USER",
    "connection.password": "PASSWORD",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id_sap_incoming",
    "auto.create": "true",
    "auto.evolve": "true",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Thanks.


Solution

  • As of the current version of Confluent Control Center, it does not support sending messages in Avro format directly through its UI. The Control Center is primarily designed for managing and monitoring your Kafka environment.

    For producing Avro messages, you would typically use other tools provided by Confluent, such as the Confluent REST Proxy or the Kafka Avro Console Producer. Both of these tools allow you to send Avro messages to Kafka, but they require that your Avro schema is registered with the Confluent Schema Registry.