azureserializationapache-kafkaavroazure-eventhub

CompletionException error in KafkaListener while listening KafkaAvro Format events


I am getting this CompletionError while using KafkaListener to listen Avro format events from Azure EventHub.The error logs:

java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: com/azure/core/models/MessageContent
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) ~[na:na]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:842) ~[na:na]
Caused by: java.lang.NoClassDefFoundError: com/azure/core/models/MessageContent
    at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:81) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:na]
    at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:28) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) ~[kafka-clients-3.6.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) ~[kafka-clients-3.6.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1649) ~[spring-kafka-3.1.1.jar:3.1.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1624) ~[spring-kafka-3.1.1.jar:3.1.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1421) ~[spring-kafka-3.1.1.jar:3.1.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-3.1.1.jar:3.1.1]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
    ... 1 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.azure.core.models.MessageContent
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[na:na]
    ... 16 common frames omitted

The listener configuration The configs that I had used for Deserialization is:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.class);

In the pom.xml file I had this dependency which provides Kafka Avro Deserializer class.

<dependency>
            <groupId>com.microsoft.azure</groupId>
            <artifactId>azure-schemaregistry-kafka-avro</artifactId>
<!--            <version>1.0.0-beta.9</version>-->
            <version>1.1.1</version>
        </dependency>

When I had used "1.0.0-beta.9" version of this dependency, I was able to read events smoothly but after the changing it to "1.1.1" version. I see this error.

I tried replacing KafkaAvroDeserializer with ByteDeserizlier in Kafkalitener configs, but that didn't worked out. I am expecting to see deserialized event/message which is readable.


Solution

  • The newer version of the azure-schemaregistry-kafka-avro library has additional dependencies that need to be clearly included. The NoClassDefFoundError for com.azure.core.models.MessageContent suggests that the azure-core library is necessary for version 1.1.1.

    Please follow the updated pom.xml with necessary dependencies below.

    Pom.xml:

    <dependency>
        <groupId>com.microsoft.azure</groupId>
        <artifactId>azure-schemaregistry-kafka-avro</artifactId>
        <version>1.1.1</version>
    </dependency>
    
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-core</artifactId>
        <version>1.31.0</version>
    </dependency>
    

    Also, add the azure-core dependency to the pom.xml file which is required.

    Here, I have taken example of Deserialized Event Handling.

    @KafkaListener(topics = "your-topic", groupId = "your-group-id")
    public void listen(ConsumerRecord<String, SpecificRecord> record) {
        String key = record.key();
        SpecificRecord value = record.value();
    
        System.out.println("Received message with key: " + key + " and value: " + value);
    
        // Process the event
        processEvent(key, value);
    }
    
    public void processEvent(String key, SpecificRecord value) {
        // write event processing logic here
    }
    

    If the application is correctly set up and you have fixed the dependency issues as I given in the above, you will successfully be receiving and processing messages.

    Runtime logs:

    2024-05-21 12:00:00 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 - Starting Kafka listener endpoint container [KafkaListenerEndpointContainer#0-0-C-1]
    2024-05-21 12:00:00 INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [your-bootstrap-servers]
        check.crcs = true
        client.dns.lookup = default
        client.id = 
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = your-group-id
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        ...
        value.deserializer = class com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer
    2024-05-21 12:00:00 INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.6.1
    2024-05-21 12:00:00 INFO  org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: abc123def456
    2024-05-21 12:00:00 INFO  org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-your-group-id-1, groupId=your-group-id] Subscribed to topic(s): your-topic
    2024-05-21 12:00:01 INFO  com.example.YourKafkaListener - Received message with key: key1 and value: {"field1": "value1", "field2": 123}
    2024-05-21 12:00:01 INFO  com.example.YourKafkaListener - Processing event with key: key1 and value: {"field1": "value1", "field2": 123}
    2024-05-21 12:00:02 INFO  com.example.YourKafkaListener - Received message with key: key2 and value: {"field1": "value2", "field2": 456}
    2024-05-21 12:00:02 INFO  com.example.YourKafkaListener - Processing event with key: key2 and value: {"field1": "value2", "field2": 456}