apache-kafkaapache-kafka-connectquestdb

Sending data to QuestDB using Kafka Connect and the HTTP transport, rather than TCP


I've been using QuestDB (both OSS and QuestDB Cloud) ingesting data via Kafka Connect. Currently I am using the TCP transport, but now that HTTP is supported I want to migrate to get better error handling.

 {
      "connector.class" : "io.questdb.kafka.QuestDBSinkConnector",
      "host" : "44e065ec-XXXXX-XXXXXX.ilp.b1t9.questdb.com:31462",
      "include.key" : "false",
      "key.converter" : "io.confluent.connect.avro.AvroConverter",
      "key.converter.schema.registry.url" : "http://schema_registry:8081",
      "name" : "questdb-trades",
      "symbols" : "symbol, side",
      "table" : "trades",
      "timestamp.field.name" : "timestamp",
      "tls" : "true",
      "token" : "XXXXXXXXXXX",
      "topics" : "trades",
      "username" : "admin",
      "value.converter" : "io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url" : "http://schema_registry:8081",
      "value.converter.schemas.enable" : "true"
   }
  

I know the HTTP transport expects data on the API port, not the TCP port. I changed that on the host string, but data is never making it into the database. If I change back to the TCP port, it all works.

What am I doing wrong?


Solution

  • QuestDB uses the API port for the HTTP transport indeed, but it is not the only change. When using HTTP, the credentials are not the ILP token, but the user (or service account) password.

    The host, tls, username, and token properties are still available for backwards compatibility, but for using HTTP you need to use the new client_conf_string

    {
              "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
              "topics": "trades",
              "client.conf.string": "https::addr=44e065ec-XXXXX-XXXXXX.ilp.b1t9.questdb.com:443;username=admin;password=XXXXXXX;",
              "name": "questdb-trades",
              "value.converter": "io.confluent.connect.avro.AvroConverter",
              "value.converter.schema.registry.url": "http://schema_registry:8081",
              "include.key": false,
              "key.converter": "io.confluent.connect.avro.AvroConverter",
              "key.converter.schema.registry.url": "http://schema_registry:8081",
              "table": "trades",
              "symbols": "symbol, side",
              "timestamp.field.name": "timestamp",
              "value.converter.schemas.enable": true
          }'
    

    You specify the transport (tcp/tcps/http, or https) directly as part of the connection string, as well as the credentials. Everything else remains the same.