apache-kafka-connectquestdb

Defining QuestDB table schema from Kafka Connect?


I am using Kafka Connect to get data into QuestDB. It works, but the table is created with default data types, which is not what I want.

I am passing this JSON to the connect API to register the connector:

{
 "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
          "topics": "machine-events",
          "host": "questdb:9009",
          "name": "questdb-machine-events",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "include.key": false,
          "key.converter": "org.apache.kafka.connect.storage.StringConverter",
          "table": "machine_events",          
          "value.converter.schemas.enable": false
      }'

The connector inserts data with no errors, but I need to define some columns as Symbols and some as Strings. Any way to achieve this?


Solution

  • An easy option would be creating the table beforehand using a CREATE TABLE statement, so you have full control of all the table parameters.

    But if you want to create the table dynamically from Kafka Connect, you can pass some extra info in that JSON specifying the schema for ambiguous datatypes. You can use the symbols, doubles, timestamp.string.fields, and timestamp.field.name keys to make sure values on those columns are not misinterpreted as Strings or Longs.

    Supposing you have four columns that look like strings, and you want to have two of those created as Symbols and two as Strings, you could pass this configuration.

    {
     "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
              "topics": "machine-events",
              "host": "questdb:9009",
              "name": "questdb-machine-events",
              "value.converter": "org.apache.kafka.connect.json.JsonConverter",
              "include.key": false,
              "key.converter": "org.apache.kafka.connect.storage.StringConverter",
              "table": "machine_events",          
              "value.converter.schemas.enable": false,
              "symbols": "column1,column2"
          }'
    

    The two columns not mentioned there will be identified as Strings.