scalaapache-kafkaavroakka-streamavro4s

how to check avro schema registry usage


I am using avro through avro4s. This is my configuration for consumer/producer

 def producerSettings(system: ActorSystem): ProducerSettings[String, Array[Byte]] = ProducerSettings(
    system,
    new StringSerializer,
    new ByteArraySerializer)
    .withBootstrapServers("localhost:9092")
    .withProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
    .withProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
    .withProperty("key.converter.schema.registry.url", "http://localhost:8081")
    .withProperty("value.converter.schema.registry.url", "http://localhost:8081")
    .withProperty("schema.registry.url", "http://localhost:8081")
    .withProperty("auto.create.topics.enable", "true")

  def consumerSettings(system: ActorSystem): ConsumerSettings[String, Array[Byte]] =
    ConsumerSettings(
      system,
      new StringDeserializer,
      new ByteArrayDeserializer)
      .withBootstrapServers("localhost:9092")
      .withProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
      .withProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
      .withProperty("key.converter.schema.registry.url", "http://localhost:8081")
      .withProperty("value.converter.schema.registry.url", "http://localhost:8081")
      .withProperty("schema.registry.url", "http://localhost:8081")
      .withGroupId("test")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

I have doubts that the registers are used. When my application is running it is silence in the schema registry logs.

How can i check that my application is use the registry?

And if it is not - how to fix it?


Solution

  • You're using the wrong classes, so your properties will likely have errors

    You actually need to use KafkaAvroSerializer for the Producer here

    new StringSerializer,
    new ByteArraySerializer)
    

    And KafkaAvroDeserializer for the consumer here

    new StringDeserializer,
    new ByteArrayDeserializer)
    

    And try to change String, Array[Byte] to GenericRecord or some case-class you made from Avro4s