I am trying to run kafka in KRaft mode, but keep running into errors. Here is my best attempt so far (I used GPT to generate this as I failed to find the relevant parts of the documentation):
docker-compose.yml
services:
# Kafka controller: manages the overall state of the Kafka cluster
kafka-controller-0:
image: apache/kafka:3.7.1
container_name: kafka-controller-0
environment:
KAFKA_BROKER_ID: 0
# Listener for the controller and broker
KAFKA_CONTROLLER_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9090
# Port for Raft communication (consensus)
KAFKA_RAFT_PORT: 9091
# Directory for Raft and Kafka data
KAFKA_RAFT_DATA_DIR: /var/lib/kafka/data
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- ./data/kafka-controller-0:/var/lib/kafka/data
ports:
# expose broker port
- "9092:9092"
networks:
- kafka-net
# Kafka broker: stores and manages partitions of topics
kafka-broker-0:
image: apache/kafka:3.7.1
container_name: kafka-broker-0
environment:
KAFKA_BROKER_ID: 1
KAFKA_CONTROLLER_LISTENERS: PLAINTEXT://:9092
KAFKA_RAFT_PORT: 9091
KAFKA_RAFT_DATA_DIR: /var/lib/kafka/data
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- ./data/kafka-broker-0:/var/lib/kafka/data
ports:
# expose broker port
- "9093:9092"
networks:
- kafka-net
kafka-broker-1:
image: apache/kafka:3.7.1
container_name: kafka-broker-1
environment:
KAFKA_BROKER_ID: 2
KAFKA_CONTROLLER_LISTENERS: PLAINTEXT://:9092
KAFKA_RAFT_PORT: 9091
KAFKA_RAFT_DATA_DIR: /var/lib/kafka/data
KAFKA_LOG_DIRS: /var/lib/kafka/data
volumes:
- ./data/kafka-broker-1:/var/lib/kafka/data
ports:
- "9094:9092"
networks:
- kafka-net
networks:
kafka-net:
Unfortunately, this gives this error:
Attaching to kafka-broker-0, kafka-broker-1, kafka-controller-0
kafka-controller-0 | ===> User
kafka-broker-0 | ===> User
kafka-broker-0 | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka-controller-0 | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka-broker-0 | ===> Setting default values of environment variables if not already set.
kafka-controller-0 | ===> Setting default values of environment variables if not already set.
kafka-controller-0 | CLUSTER_ID not set. Setting it to default value: "5L6g3nShT-eMCtK--X86sw"
kafka-broker-0 | CLUSTER_ID not set. Setting it to default value: "5L6g3nShT-eMCtK--X86sw"
kafka-controller-0 | ===> Configuring ...
kafka-broker-0 | ===> Configuring ...
kafka-controller-0 | ===> Launching ...
kafka-broker-0 | ===> Launching ...
kafka-broker-0 | ===> Using provided cluster id 5L6g3nShT-eMCtK--X86sw ...
kafka-controller-0 | ===> Using provided cluster id 5L6g3nShT-eMCtK--X86sw ...
kafka-broker-1 | ===> User
kafka-broker-1 | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka-broker-1 | ===> Setting default values of environment variables if not already set.
kafka-broker-1 | CLUSTER_ID not set. Setting it to default value: "5L6g3nShT-eMCtK--X86sw"
kafka-broker-1 | ===> Configuring ...
kafka-broker-1 | ===> Launching ...
kafka-broker-1 | ===> Using provided cluster id 5L6g3nShT-eMCtK--X86sw ...
kafka-broker-0 | Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration `zookeeper.connect` which has no default value. at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2299) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:2290) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1638) at kafka.tools.StorageTool$.$anonfun$execute$1(StorageTool.scala:71) at scala.Option.flatMap(Option.scala:283) at kafka.tools.StorageTool$.execute(StorageTool.scala:71) at kafka.tools.StorageTool$.main(StorageTool.scala:52) at kafka.docker.KafkaDockerWrapper$.main(KafkaDockerWrapper.scala:47) at kafka.docker.KafkaDockerWrapper.main(KafkaDockerWrapper.scala)
kafka-controller-0 | Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration `zookeeper.connect` which has no default value. at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2299) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:2290) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1638) at kafka.tools.StorageTool$.$anonfun$execute$1(StorageTool.scala:71) at scala.Option.flatMap(Option.scala:283) at kafka.tools.StorageTool$.execute(StorageTool.scala:71) at kafka.tools.StorageTool$.main(StorageTool.scala:52) at kafka.docker.KafkaDockerWrapper$.main(KafkaDockerWrapper.scala:47) at kafka.docker.KafkaDockerWrapper.main(KafkaDockerWrapper.scala)
kafka-broker-1 | Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration `zookeeper.connect` which has no default value. at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2299) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:2290) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1638) at kafka.tools.StorageTool$.$anonfun$execute$1(StorageTool.scala:71) at scala.Option.flatMap(Option.scala:283) at kafka.tools.StorageTool$.execute(StorageTool.scala:71) at kafka.tools.StorageTool$.main(StorageTool.scala:52) at kafka.docker.KafkaDockerWrapper$.main(KafkaDockerWrapper.scala:47) at kafka.docker.KafkaDockerWrapper.main(KafkaDockerWrapper.scala)
kafka-controller-0 exited with code 1
kafka-broker-1 exited with code 1
kafka-broker-0 exited with code 1
Adding KAFKA_ZOOKEEPER_CONNECT: ""
to all containers removes this error but induces a new one:
kafka-broker-0 | The kafka configuration file appears to be for a legacy cluster. Formatting is only supported for clusters in KRaft mode.
kafka-broker-1 | The kafka configuration file appears to be for a legacy cluster. Formatting is only supported for clusters in KRaft mode.
kafka-controller-0 | The kafka configuration file appears to be for a legacy cluster. Formatting is only supported for clusters in KRaft mode.
What is the correct way to achieve this, and is there any good documentation that helps me to proceed more systematically?
I am using Kafka to ingest data into #QuestDB and this compose is working for me. I define 2 different brokers/controllers, so I have some common definition at the top and then I include it on both brokers.
(updated): I am also including my configuration for two kafka connect instances and a single schema registry. In a production environment you might want to have more than one schema registry for high availability, and you might want to separate controller and broker roles on the kafka brokers, but hopefully this helps you get started.
Note that I am also including, for your reference, how to automatically register a kafka connect connector on startup. For this to work, you need to have the connector jar (or download from the confluent hub). In my case, I am mounting the local folder ./kafka-connect-plugins
, where I have the jar file for the questdb-connect, and then referencing it on the config for the workers. As you see, registration of the connector needs to be done on only one of the kafka connects, and it will be automatically be registered for the rest of workers in the cluster.
# Common kafka broker environment variables
x-kafka-broker-env: &kafka-broker-env
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
KAFKA_JMX_HOSTNAME: localhost
KAFKA_ENABLE_KRAFT: yes
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093,2@broker-2:29093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_METADATA_LOG_DIR: /tmp/kraft-metadata-logs
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MTkwSDkzMDg5KTdFNDJCRU'
# Common kafka broker configuration
x-kafka-broker-common: &kafka-broker-common
image: confluentinc/cp-kafka:7.7.0
extra_hosts:
- "host.docker.internal:host-gateway"
# Common kafka connect environment variables
x-kafka-connect-env: &kafka-connect-env
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092,broker-2:29092'
CONNECT_GROUP_ID: compose-connect-group
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 500
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 2
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 2
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 2
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.3-0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_ERRORS_RETRY_TIMEOUT: 90000
CONNECT_ERRORS_RETRY_DELAY_MAX_MS: 120000
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components"
QUESTDB_HTTP_ENDPOINT: "${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000}"
# Common kafka connect configuration
x-kafka-connect-common: &kafka-connect-common
image: confluentinc/cp-kafka-connect:7.7.0
depends_on:
- broker-1
- broker-2
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ./kafka-connect-plugins:/etc/kafka-connect/jars
services:
questdb:
image: questdb/questdb:8.1.1
container_name: rta_questdb
restart: always
ports:
- "8812:8812"
- "9000:9000"
- "9009:9009"
- "9003:9003"
extra_hosts:
- "host.docker.internal:host-gateway"
environment:
- QDB_METRICS_ENABLED=TRUE
volumes:
- ./questdb/questdb_root:/var/lib/questdb/:rw
broker-1:
<<: *kafka-broker-common
hostname: broker
container_name: rta_kafka_broker
ports:
- "9092:9092"
- "9101:9101"
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ./broker-1/kafka-data:/var/lib/kafka/data
- ./broker-1/kafka-secrets:/etc/kafka/secrets
- ./broker-1/tmp:/tmp
environment:
<<: *kafka-broker-env
KAFKA_NODE_ID: 1
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_JMX_PORT: 9101
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
broker-2:
<<: *kafka-broker-common
hostname: broker-2
container_name: rta_kafka_broker_2
ports:
- "9093:9093"
- "9102:9102"
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ./broker-2/kafka-data:/var/lib/kafka/data
- ./broker-2/kafka-secrets:/etc/kafka/secrets
- ./broker-2/tmp:/tmp
environment:
<<: *kafka-broker-env
KAFKA_NODE_ID: 2
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-2:29092,PLAINTEXT_HOST://localhost:9093'
KAFKA_JMX_PORT: 9102
KAFKA_LISTENERS: 'PLAINTEXT://broker-2:29092,CONTROLLER://broker-2:29093,PLAINTEXT_HOST://0.0.0.0:9093'
schema_registry:
image: confluentinc/cp-schema-registry:7.7.0
hostname: schema_registry
container_name: rta_schema_registry
depends_on:
- broker-1
- broker-2
ports:
- "8081:8081"
extra_hosts:
- "host.docker.internal:host-gateway"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema_registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092,broker-2:29092'
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS'
kafka-connect-1:
<<: *kafka-connect-common
hostname: kafka-connect
container_name: rta_kafka_connect
ports:
- "8083:8083"
environment:
<<: *kafka-connect-env
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_LISTENERS: http://0.0.0.0:8083
command:
- bash
- -c
- |
# Launch Kafka Connect
/etc/confluent/docker/run &
#
# Wait for Kafka Connect listener
echo "Waiting for Kafka Connect to start listening on localhost ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
echo -e "\n--\n+> Registering QuestDB Connector"
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/questdb-trades/config -d '{
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"tasks.max": "5",
"topics": "trades",
"client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT:-host.docker.internal:9000};",
"name": "questdb-trades",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema_registry:8081",
"include.key": false,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema_registry:8081",
"table": "trades",
"symbols": "symbol, side",
"timestamp.field.name": "timestamp",
"value.converter.schemas.enable": true
}'
sleep infinity
kafka-connect-2:
<<: *kafka-connect-common
hostname: kafka-connect-2
container_name: rta_kafka_connect_2
ports:
- "8084:8084"
environment:
<<: *kafka-connect-env
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect-2
CONNECT_LISTENERS: http://0.0.0.0:8084