I have this json-schema:
{
"$id": "http://schema-registry:8081/schemas/ids/1",
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Locationvalue",
"type": "object",
"properties": {
"profileId": {
"type": "string",
"description": "The id of the location."
},
"latitude": {
"type": "number",
"minimum": -90,
"maximum": 90,
"description": "The location's latitude."
},
"longitude": {
"type": "number",
"minimum": -180,
"maximum": 180,
"description": "The location's longitude."
}
}
}
which I post to schema-registry
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "$(cat schema-value-registry.json)" \
http://localhost:8081/subjects/locations-value/versions
{"id":1}
using this input
{
"schemaType":"JSON",
"schema":"{\"$id\":\"http://schema-registry:8081/schemas/ids/1\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"title\":\"Locationvalue\",\"type\":\"object\",\"properties\":{\"profileId\":{\"type\":\"string\",\"description\":\"The id of the location.\"},\"latitude\":{\"type\":\"number\",\"minimum\":-90,\"maximum\":90,\"description\":\"The location's latitude.\"},\"longitude\":{\"type\":\"number\",\"minimum\":-180,\"maximum\":180,\"description\":\"The location's longitude.\"}}}"
}
I push a message to topic locations
kafka-json-schema-console-producer \
--bootstrap-server broker:9092 \
--topic locations \
--property key.serializer=org.apache.kafka.common.serialization.StringSerializer \
--property key.separator=":" \
--property parse.key=true \
--property schema.registry.url=http://schema-registry:8081 \
--property value.schema.id=1
"asd":{"profileId":"asd","latitude":0,"longitude":-1}
"asdfghjkl":{"profileId":"asdfghjkl","latitude":90.000,"longitude":-180.000}
In kslqdb I can see the topic
ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
default_ksql_processing_log | 1 | 1
docker-connect-configs | 1 | 1
docker-connect-offsets | 25 | 1
docker-connect-status | 5 | 1
locations | 1 | 1
---------------------------------------------------------------
I can create a table
ksql> CREATE SOURCE TABLE locations (
rowkey VARCHAR PRIMARY KEY
) WITH (
KAFKA_TOPIC = 'locations',
VALUE_FORMAT = 'JSON_SR',
VALUE_SCHEMA_ID = 1
);
Message
---------------------------------------
Created query with ID CST_LOCATIONS_1
---------------------------------------
ksql> describe locations;
Name : LOCATIONS
Field | Type
--------------------------------------------
ROWKEY | VARCHAR(STRING) (primary key)
profileId | VARCHAR(STRING)
latitude | DOUBLE
longitude | DOUBLE
--------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
I can query the table
ksql> SELECT * FROM locations;
+---------------+---------------+---------------+---------------+
|ROWKEY |profileId |latitude |longitude |
+---------------+---------------+---------------+---------------+
|"asd" |asd |0.0 |-1.0 |
|"asdfghjkl" |asdfghjkl |90.0 |-180.0 |
Query terminated
ksql> SELECT * FROM locations WHERE ROWKEY = '"asd"';
+---------------+---------------+---------------+---------------+
|ROWKEY |profileId |latitude |longitude |
+---------------+---------------+---------------+---------------+
|"asd" |asd |0.0 |-1.0 |
Query terminated
But when I try to filter on one of the columns from the schema definition I get this error
ksql> SELECT * FROM locations WHERE profiledId = 'asd';
Line: 1, Col: 31: WHERE column 'PROFILEDID' cannot be resolved.
ksql> SELECT * FROM locations WHERE latitude = 0.0;
Line: 1, Col: 31: WHERE column 'LATITUDE' cannot be resolved.
ksql> SELECT * FROM locations WHERE longitude = -1.0;
Line: 1, Col: 31: WHERE column 'LONGITUDE' cannot be resolved.
Writing the column names in all upper case does not change the errors.
How can I use the fields from the schema-defintion in my query statements?
Use backticks
select * from locations where `profiledId` = 'asd';