I am trying to deserialize a kafka record consumed from kafka topic using the below code, even though the schema is correct we are facing the error. can you suggest what else could be wrong.
Code
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificDatumReader;
public class AvroUtility {
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(Replicator.class);
public SpecificDatumReader<GenericRecord> datumReader() {
String valueSchemaString = null;
valueSchemaString = "my schema in form of json string"
Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
return datumReader;
}
}
We used the above AvroUtility class in below code
ConsumerRecord<String, byte[]> record
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);
We are getting below error
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at com.myapp.controller.Replicator.lambda$replicator$0(Replicator.java:116)
at java.lang.Iterable.forEach(Iterable.java:75)
at com.myapp.controller.Replicator.replicator(Replicator.java:104)
at com.myapp.SpringBootWithKafkaApplication.main(SpringBootWithKafkaApplication.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:51)
at org.springframework.boot.loader.PropertiesLauncher.main(PropertiesLauncher.java:597)
Here you are forgetting one thing:
ConsumerRecord<String, byte[]> record;
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);
The binary payload that is passing into binaryDecoder
includes extra bytes :
[Magic][4-byte_schemaID][Avro bytes]
The datumReader
doesn't expect this, thou causing the failure. This would work if you were using raw Avro, but as you said you serialize with the KafkaAvroSerializer
So, in order to read properly the payload, you could:
byte[] kafkaPayload = record.value();
int schemaRegistryHeaderLength = 5; //header bytes
byte[] avroData = Arrays.copyOfRange(kafkaPayload, schemaRegistryHeaderLength,
kafkaPayload.length);
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);
If using Confluent, you could also use the AvroDeserializer
:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
Properties props = new Properties();
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "yourRegistry:port");
deserializer.configure(props, false);
GenericRecord deserializedRecord = (GenericRecord) deserializer.deserialize("topic", record.value());