I cannot find any information on the internet how to send a record with a json schema to kafka using spring kafka. How can I do that?
After spending several hours I figured out that there are 3 different ways how one can send a record with a json schema. The relevant part is implemented in io.confluent.kafka.schemaregistry.json.JsonSchemaUtils
here the extract:
if (isEnvelope(object)) {
return getSchemaFromEnvelope((JsonNode) object);
}
Class<?> cls = object.getClass();
if (cls.isAnnotationPresent(Schema.class)) {
Schema schema = (Schema) cls.getAnnotation(Schema.class);
List<SchemaReference> references = Arrays.stream(schema.refs())
.map(ref -> new SchemaReference(ref.name(), ref.subject(), ref.version()))
.collect(Collectors.toList());
if (client == null) {
if (!references.isEmpty()) {
throw new IllegalArgumentException("Cannot resolve schema " + schema.value()
+ " with refs " + references);
}
return new JsonSchema(schema.value());
} else {
return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references)
.orElseThrow(() -> new IOException("Invalid schema " + schema.value()
+ " with refs " + references));
}
}
JsonSchemaConfig config = getConfig(useOneofForNullables, failUnknownProperties);
JsonSchemaDraft draft;
switch (specVersion) {
case DRAFT_4:
draft = JsonSchemaDraft.DRAFT_04;
break;
case DRAFT_6:
draft = JsonSchemaDraft.DRAFT_06;
break;
case DRAFT_7:
draft = JsonSchemaDraft.DRAFT_07;
break;
case DRAFT_2019_09:
draft = JsonSchemaDraft.DRAFT_2019_09;
break;
default:
draft = JsonSchemaDraft.DRAFT_07;
break;
}
config = config.withJsonSchemaDraft(draft);
JsonSchemaGenerator jsonSchemaGenerator = new JsonSchemaGenerator(objectMapper, config);
JsonNode jsonSchema = jsonSchemaGenerator.generateJsonSchema(cls);
return new JsonSchema(jsonSchema);
so you have 3 possiblities:
I chose 1) using the following code:
public class MyKafkaTemplate {
private static final String SCHEMA_POSTFIX_KEY = "-key.json";
private static final String SCHEMA_POSTFIX_VALUE = "-value.json";
private final KafkaTemplate<JsonNode, JsonNode> kafkaTemplate;
private final ObjectMapper objectMapper;
private final Map<String, JsonNode> topicSchemaCache = new ConcurrentHashMap<>();
public <K, V> void send(final String topic, final K key, final V value) {
final JsonNode keyNode = getEnvelope(topic + SCHEMA_POSTFIX_KEY, key);
final JsonNode valueNode = getEnvelope(topic + SCHEMA_POSTFIX_VALUE, value);
kafkaTemplate.send(topic, keyNode, valueNode);
}
private JsonNode getEnvelope(final String schemaFilePath, final Object key) {
final JsonNode schemaNode = getOrLoadSchema(schemaFilePath);
final JsonNode payload = objectMapper.valueToTree(key);
return JsonSchemaUtils.envelope(schemaNode, payload);
}
private JsonNode getOrLoadSchema(final String schemaFilePath) {
return topicSchemaCache.computeIfAbsent(schemaFilePath, key ->
readFileToJsonNode(schemaFilePath));
}
@SneakyThrows
private JsonNode readFileToJsonNode(final String schemaFilePath) {
return objectMapper.readTree(new ClassPathResource(schemaFilePath).getFile());
}
}