apache-kafkajsonschemaconfluent-schema-registryconfluent-rest-proxy

Kafka Rest Proxy JSON schema validation


I want to produce to a kafka topic via Kafka Rest Proxy. I've created a JSON schema in the schema-registry and I would like that all messages are validated against the registered schema and rejected if they don't match the Schema.

My Schema

{
  "type": "object",
  "properties": {
    "foo": {
      "type": "string",
    },
    "bar": {
      "type": "number" 
    }
  }
}

This schema is registered correctly and assigned version 1. Then I try to produce a message with the wrong data type for both foo and bar but the message is accepted.

curl --location --request POST 'http://localhost:8082/topics/test' \
--header 'Content-Type: application/vnd.kafka.jsonschema.v2+json' \
--header 'Accept: application/vnd.kafka.v2+json' \
--data-raw '{
    "value_schema_id": 1,
    "records": [
        {
            "value": {
                "foo": 10,
                "bar":"not a number"
            }
        }
    ]
}'

Notice that I'm producing to the test topic that has a schema associated, but the faulty message get's accepted anyways. I also tried adding "value_schema_id": 1 to make sure the schema is referenced in the payload, but the faulty message is still accepted.

However, if I pass the JSON schema as value_schema it works as expected

{
    "value_schema": "{\"type\": \"object\",\"properties\": {\"foo\": {\"type\": \"string\"},\"bar\": {\"type\": \"number\"}}}",
    "records": [
        {
            "value": {
                "foo": "10",
                "bar": "1"
            }
        }
    ]
}

Response

{
    "error_code": 42203,
    "message": "Conversion of JSON to Object failed: Failed to convert JSON using JSON Schema: #/bar: expected type: Number, found: String"
}

Question: Is it possible to reference an existing schema id when producing messages without having to pass the whole JSON schema each time?


Solution

  • Yes, it's possible. You have to enable it though to either value, key or both as per below:

    confluent.value.schema.validation=true 
    confluent.key.schema.validation=true
    

    Also, in your JSON request you can specify the ID of your schema. Take the below as an example, to illustrate what I'm saying:

    {
      "key_schema_id": 1234,
      "value_schema_id: 56789,
      "records": [
        {
          "key": "key123",
          "value": "my_simple_string_value_example"
        }
      ]
    }
    

    Where:

    key_schema_id

    Is the ID of the key schema in Schema Registry. By setting an appropriate value to it will guarantee that only messages with a key compliant with the schema identified by that ID will be accepted.

    Similarly:

    value_schema_id

    Is the ID of the value schema in Schema Registry. By setting an appropriate value to it will guarantee that only messages with a value compliant with the schema identified by that ID will be accepted.

    Hope that helps to shed some light.

    Cheers,

    Eduardo Ponzoni