We’re trying to decide between providing generic vs specific record formats for consumption by our clients with an eye to providing an online schema registry clients can access when the schemas are updated. We expect to send out serialized blobs prefixed with a few bytes denoting the version number so schema retrieval from our registry can be automated.
Now, we’ve come across code examples illustrating the relative adaptability of the generic format for schema changes but we’re reluctant to give up the type safety and ease-of-use provided by the specific format.
Is there a way to obtain the best of both worlds? I.e. could we work with and manipulate the specific generated
classes internally and then have them converted them to generic records automatically just before serialization?
Clients would then deserialize the generic records (after looking up the schema).
Also, could clients convert these generic records they received to specific ones at a later time? Some small code examples would be helpful!
Or are we looking at this all the wrong way?
What you are looking for is Confluent Schema registry service and libs which helps to integrate with this.
Providing a sample to write Serialize De-serialize avro data with a evolving schema. Please note providing sample from Kafka.
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import java.util.HashMap; import java.util.Map;
public class ConfluentSchemaService {
public static final String TOPIC = "DUMMYTOPIC";
private KafkaAvroSerializer avroSerializer;
private KafkaAvroDeserializer avroDeserializer;
public ConfluentSchemaService(String conFluentSchemaRigistryURL) {
//PropertiesMap
Map<String, String> propMap = new HashMap<>();
propMap.put("schema.registry.url", conFluentSchemaRigistryURL);
// Output afterDeserialize should be a specific Record and not Generic Record
propMap.put("specific.avro.reader", "true");
avroSerializer = new KafkaAvroSerializer();
avroSerializer.configure(propMap, true);
avroDeserializer = new KafkaAvroDeserializer();
avroDeserializer.configure(propMap, true);
}
public String hexBytesToString(byte[] inputBytes) {
return Hex.encodeHexString(inputBytes);
}
public byte[] hexStringToBytes(String hexEncodedString) throws DecoderException {
return Hex.decodeHex(hexEncodedString.toCharArray());
}
public byte[] serializeAvroPOJOToBytes(GenericRecord avroRecord) {
return avroSerializer.serialize(TOPIC, avroRecord);
}
public Object deserializeBytesToAvroPOJO(byte[] avroBytearray) {
return avroDeserializer.deserialize(TOPIC, avroBytearray);
} }
Following classes have all the code you are looking for.
io.confluent.kafka.serializers.KafkaAvroDeserializer;
io.confluent.kafka.serializers.KafkaAvroSerializer;