javaapache-kafkaspring-kafkajsonschemaconfluent-schema-registry

Send Record with JSON Schema to Kafka using Spring-Kafka and Confluent schema registry


I cannot find any information on the internet how to send a record with a json schema to kafka using spring kafka. How can I do that?


Solution

  • After spending several hours I figured out that there are 3 different ways how one can send a record with a json schema. The relevant part is implemented in io.confluent.kafka.schemaregistry.json.JsonSchemaUtils

    here the extract:

    if (isEnvelope(object)) {
      return getSchemaFromEnvelope((JsonNode) object);
    }
    Class<?> cls = object.getClass();
    if (cls.isAnnotationPresent(Schema.class)) {
      Schema schema = (Schema) cls.getAnnotation(Schema.class);
      List<SchemaReference> references = Arrays.stream(schema.refs())
              .map(ref -> new SchemaReference(ref.name(), ref.subject(), ref.version()))
              .collect(Collectors.toList());
      if (client == null) {
        if (!references.isEmpty()) {
          throw new IllegalArgumentException("Cannot resolve schema " + schema.value()
                  + " with refs " + references);
        }
        return new JsonSchema(schema.value());
      } else {
        return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references)
                .orElseThrow(() -> new IOException("Invalid schema " + schema.value()
                        + " with refs " + references));
      }
    }
    JsonSchemaConfig config = getConfig(useOneofForNullables, failUnknownProperties);
    JsonSchemaDraft draft;
    switch (specVersion) {
      case DRAFT_4:
        draft = JsonSchemaDraft.DRAFT_04;
        break;
      case DRAFT_6:
        draft = JsonSchemaDraft.DRAFT_06;
        break;
      case DRAFT_7:
        draft = JsonSchemaDraft.DRAFT_07;
        break;
      case DRAFT_2019_09:
        draft = JsonSchemaDraft.DRAFT_2019_09;
        break;
      default:
        draft = JsonSchemaDraft.DRAFT_07;
        break;
    }
    config = config.withJsonSchemaDraft(draft);
    JsonSchemaGenerator jsonSchemaGenerator = new JsonSchemaGenerator(objectMapper, config);
    JsonNode jsonSchema = jsonSchemaGenerator.generateJsonSchema(cls);
    return new JsonSchema(jsonSchema);
    

    so you have 3 possiblities:

    1. create a JsonNode that contains a schema and a payload field
    2. annotate your Class with @Schema
    3. provide no schema and let it be generated by the schemagenerator

    I chose 1) using the following code:

    public class MyKafkaTemplate {
    
       private static final String SCHEMA_POSTFIX_KEY = "-key.json";
       private static final String SCHEMA_POSTFIX_VALUE = "-value.json";
       private final KafkaTemplate<JsonNode, JsonNode> kafkaTemplate;
       private final ObjectMapper objectMapper;
       private final Map<String, JsonNode> topicSchemaCache = new ConcurrentHashMap<>();
    
       public <K, V> void send(final String topic, final K key, final V value) {
           final JsonNode keyNode = getEnvelope(topic + SCHEMA_POSTFIX_KEY, key);
           final JsonNode valueNode = getEnvelope(topic + SCHEMA_POSTFIX_VALUE, value);
           kafkaTemplate.send(topic, keyNode, valueNode);
       }
    
       private JsonNode getEnvelope(final String schemaFilePath, final Object key) {
           final JsonNode schemaNode = getOrLoadSchema(schemaFilePath);
           final JsonNode payload = objectMapper.valueToTree(key);
           return JsonSchemaUtils.envelope(schemaNode, payload);
       }
    
       private JsonNode getOrLoadSchema(final String schemaFilePath) {
           return topicSchemaCache.computeIfAbsent(schemaFilePath, key -> 
            readFileToJsonNode(schemaFilePath));
       }
    
       @SneakyThrows
       private JsonNode readFileToJsonNode(final String schemaFilePath) {
           return objectMapper.readTree(new ClassPathResource(schemaFilePath).getFile());
       }
    
    }