I am getting this CompletionError while using KafkaListener to listen Avro format events from Azure EventHub.The error logs:
java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: com/azure/core/models/MessageContent
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:842) ~[na:na]
Caused by: java.lang.NoClassDefFoundError: com/azure/core/models/MessageContent
at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:81) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:na]
at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:28) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:na]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1262) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186) ~[kafka-clients-3.6.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) ~[kafka-clients-3.6.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1649) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1624) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1421) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-3.1.1.jar:3.1.1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
... 1 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.azure.core.models.MessageContent
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) ~[na:na]
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) ~[na:na]
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[na:na]
... 16 common frames omitted
The listener configuration The configs that I had used for Deserialization is:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.class);
In the pom.xml file I had this dependency which provides Kafka Avro Deserializer class.
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<!-- <version>1.0.0-beta.9</version>-->
<version>1.1.1</version>
</dependency>
When I had used "1.0.0-beta.9" version of this dependency, I was able to read events smoothly but after the changing it to "1.1.1" version. I see this error.
I tried replacing KafkaAvroDeserializer with ByteDeserizlier in Kafkalitener configs, but that didn't worked out. I am expecting to see deserialized event/message which is readable.
NoClassDefFoundError
for com.azure.core.models.MessageContent
indicates that a transitive dependency required by azure-schemaregistry-kafka-avro
is missing/in-correct version.The newer version of the azure-schemaregistry-kafka-avro
library has additional dependencies that need to be clearly included. The NoClassDefFoundError
for com.azure.core.models.MessageContent
suggests that the azure-core
library is necessary for version 1.1.1
.
Please follow the updated pom.xml
with necessary dependencies below.
Pom.xml:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.31.0</version>
</dependency>
Also, add the azure-core
dependency to the pom.xml
file which is required.
Here, I have taken example of Deserialized Event Handling.
@KafkaListener(topics = "your-topic", groupId = "your-group-id")
public void listen(ConsumerRecord<String, SpecificRecord> record) {
String key = record.key();
SpecificRecord value = record.value();
System.out.println("Received message with key: " + key + " and value: " + value);
// Process the event
processEvent(key, value);
}
public void processEvent(String key, SpecificRecord value) {
// write event processing logic here
}
If the application is correctly set up and you have fixed the dependency issues as I given in the above, you will successfully be receiving and processing messages.
Runtime logs:
2024-05-21 12:00:00 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 - Starting Kafka listener endpoint container [KafkaListenerEndpointContainer#0-0-C-1]
2024-05-21 12:00:00 INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [your-bootstrap-servers]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = your-group-id
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
...
value.deserializer = class com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer
2024-05-21 12:00:00 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.6.1
2024-05-21 12:00:00 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: abc123def456
2024-05-21 12:00:00 INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-your-group-id-1, groupId=your-group-id] Subscribed to topic(s): your-topic
2024-05-21 12:00:01 INFO com.example.YourKafkaListener - Received message with key: key1 and value: {"field1": "value1", "field2": 123}
2024-05-21 12:00:01 INFO com.example.YourKafkaListener - Processing event with key: key1 and value: {"field1": "value1", "field2": 123}
2024-05-21 12:00:02 INFO com.example.YourKafkaListener - Received message with key: key2 and value: {"field1": "value2", "field2": 456}
2024-05-21 12:00:02 INFO com.example.YourKafkaListener - Processing event with key: key2 and value: {"field1": "value2", "field2": 456}