jsonapache-kafkaksqldb

In ksqldb, why is the column name from a schema definition not resolved when using it during SELECT?


I have this json-schema:

{
  "$id": "http://schema-registry:8081/schemas/ids/1",
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "Locationvalue",
  "type": "object",
  "properties": {
    "profileId": {
      "type": "string",
      "description": "The id of the location."
    },
    "latitude": {
      "type": "number",
      "minimum": -90,
      "maximum": 90,
      "description": "The location's latitude."
    },
    "longitude": {
      "type": "number",
      "minimum": -180,
      "maximum": 180,
      "description": "The location's longitude."
    }
  }
}

which I post to schema-registry

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data "$(cat schema-value-registry.json)" \
    http://localhost:8081/subjects/locations-value/versions
{"id":1}

using this input

{
  "schemaType":"JSON",
  "schema":"{\"$id\":\"http://schema-registry:8081/schemas/ids/1\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"title\":\"Locationvalue\",\"type\":\"object\",\"properties\":{\"profileId\":{\"type\":\"string\",\"description\":\"The id of the location.\"},\"latitude\":{\"type\":\"number\",\"minimum\":-90,\"maximum\":90,\"description\":\"The location's latitude.\"},\"longitude\":{\"type\":\"number\",\"minimum\":-180,\"maximum\":180,\"description\":\"The location's longitude.\"}}}"
}

I push a message to topic locations

kafka-json-schema-console-producer \
    --bootstrap-server broker:9092 \
    --topic locations \
    --property key.serializer=org.apache.kafka.common.serialization.StringSerializer \
    --property key.separator=":" \
    --property parse.key=true \
    --property schema.registry.url=http://schema-registry:8081 \
    --property value.schema.id=1

"asd":{"profileId":"asd","latitude":0,"longitude":-1}
"asdfghjkl":{"profileId":"asdfghjkl","latitude":90.000,"longitude":-180.000}

In kslqdb I can see the topic

ksql> show topics;

 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 default_ksql_processing_log | 1          | 1
 docker-connect-configs      | 1          | 1
 docker-connect-offsets      | 25         | 1
 docker-connect-status       | 5          | 1
 locations                   | 1          | 1
---------------------------------------------------------------

I can create a table

ksql> CREATE SOURCE TABLE locations (
    rowkey VARCHAR PRIMARY KEY
) WITH (
    KAFKA_TOPIC = 'locations',
    VALUE_FORMAT = 'JSON_SR',
    VALUE_SCHEMA_ID = 1
);


 Message
---------------------------------------
 Created query with ID CST_LOCATIONS_1
---------------------------------------
ksql> describe locations;

Name                 : LOCATIONS
 Field     | Type
--------------------------------------------
 ROWKEY    | VARCHAR(STRING)  (primary key)
 profileId | VARCHAR(STRING)
 latitude  | DOUBLE
 longitude | DOUBLE
--------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

I can query the table

ksql> SELECT * FROM locations;
+---------------+---------------+---------------+---------------+
|ROWKEY         |profileId      |latitude       |longitude      |
+---------------+---------------+---------------+---------------+
|"asd"          |asd            |0.0            |-1.0           |
|"asdfghjkl"    |asdfghjkl      |90.0           |-180.0         |
Query terminated
ksql> SELECT * FROM locations WHERE ROWKEY = '"asd"';
+---------------+---------------+---------------+---------------+
|ROWKEY         |profileId      |latitude       |longitude      |
+---------------+---------------+---------------+---------------+
|"asd"          |asd            |0.0            |-1.0           |
Query terminated

But when I try to filter on one of the columns from the schema definition I get this error

ksql> SELECT * FROM locations WHERE profiledId = 'asd';
Line: 1, Col: 31: WHERE column 'PROFILEDID' cannot be resolved.
ksql> SELECT * FROM locations WHERE latitude = 0.0;
Line: 1, Col: 31: WHERE column 'LATITUDE' cannot be resolved.
ksql> SELECT * FROM locations WHERE longitude = -1.0;
Line: 1, Col: 31: WHERE column 'LONGITUDE' cannot be resolved.

Writing the column names in all upper case does not change the errors.

How can I use the fields from the schema-defintion in my query statements?


Solution

  • Use backticks

    select * from locations where `profiledId` = 'asd';