I'm working in a Kubernetes environment where we deploy Kafka and related services using a custom Helm chart. Here's the stack:
0.46.0
, deployed in KRaft mode, version 4.0.0
quay.io/strimzi/kafka:0.46.0-kafka-4.0.0
, extended with:
ojdbc17.jar
for Oracledocker.io/bitnami/schema-registry:7.9.0
auto.create.topics.enable: true
)The Issue:
My KafkaConnect
cluster creates its required internal topics just fine:
connect-cluster-configs
connect-cluster-offsets
connect-cluster-status
However, when I deploy a KafkaConnector
(Debezium Oracle source connector), it fails to auto-create any topics (e.g., the topic for schema.history.internal.kafka.topic
, database.history.kafka.topic
, or for the data tables themselves). There’s no clear exception, but we see timeouts in the logs:
```log
2025-06-03 14:56:28 INFO [kafka-producer-network-thread | next-schemahistory] NetworkClient:411 - [Producer clientId=next-schemahistory] Cancelled in-flight METADATA request with correlation id 126 due to node -1 being disconnected (elapsed time since creation: 1ms, elapsed time since send: 1ms, throttle time: 0ms, request timeout: 30000ms)
2025-06-03 14:56:28 WARN [kafka-producer-network-thread | next-schemahistory] NetworkClient:1255 - [Producer clientId=next-schemahistory] Bootstrap broker kafka-kafka-bootstrap:9092 (id: -1 rack: null isFenced: false) disconnected
2025-06-03 14:56:28 INFO [SourceTaskOffsetCommitter-1] BaseSourceTask:503 - Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart
2025-06-03 14:56:28 INFO [task-thread-debezium-connector-ko-employee-0] ConsumerCoordinator:1056 - [Consumer clientId=next-schemahistory, groupId=next-schemahistory] Resetting generation and member id due to: consumer pro-actively leaving the group
2025-06-03 14:56:28 INFO [task-thread-debezium-connector-ko-employee-0] ConsumerCoordinator:1103 - [Consumer clientId=next-schemahistory, groupId=next-schemahistory] Request joining group due to: consumer pro-actively leaving the group
2025-06-03 14:56:28 INFO [task-thread-debezium-connector-ko-employee-0] AppInfoParser:89 - App info kafka.consumer for next-schemahistory unregistered
2025-06-03 14:56:28 ERROR [task-thread-debezium-connector-ko-employee-0] WorkerTask:234 - WorkerSourceTask{id=debezium-connector-ko-employee-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2025-06-03 14:56:28 INFO [task-thread-debezium-connector-ko-employee-0] BaseSourceTask:436 - Stopping down connector
2025-06-03 14:56:28 INFO [pool-14-thread-1] JdbcConnection:983 - Connection gracefully closed
2025-06-03 14:56:28 INFO [pool-15-thread-1] JdbcConnection:983 - Connection gracefully closed
2025-06-03 14:56:28 INFO [task-thread-debezium-connector-ko-employee-0] KafkaProducer:1367 - [Producer clientId=next-schemahistory] Closing the Kafka producer with timeoutMillis = 30000 ms.
```
This results in no topics being created, and the connector shuts down.
Configuration Details:
All services use the same SASL user (named admin
) and configuration for auth. Relevant parts:
```yaml
security.protocol: SASL_PLAINTEXT
sasl.mechanism: SCRAM-SHA-512
sasl.jaas.config: ${secrets:admin/sasl.jaas.config}
```
The KafkaConnector
config:
```yaml
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-ko-employee
namespace: {{ .Release.Namespace }}
labels:
strimzi.io/cluster: {{ .Values.debezium.cluster.name }}
annotations:
strimzi.io/use-connector-resources: "true"
argocd.argoproj.io/sync-wave: "6"
spec:
class: io.debezium.connector.oracle.OracleConnector
tasksMax: 1
autoRestart:
enabled: true
config:
oracle.connection.string: "jdbc:oracle:thin:@//<WORKING_JDBC_STRING>"
database.hostname: "<HOST>"
database.url: "jdbc:oracle:thin:@//<WORKING_JDBC_STRING>"
database.port: "<PORT>"
database.user: "<USER>"
database.password: "<PASS>" # ... Think about using a Secret
database.dbname: "<DB_NAME>"
database.pdb.name: "<PDB_NAME>"
table.include.list: "SFNKO.MITARBEITER_OUTBOX,SFNKO.MITARBEITER"
schema.include.list: "SFNKO"
database.include.list: "<DBs>"
database.history.kafka.bootstrap.servers: {{ .Values.kafka.authentication.security_protocol }}://{{ .Release.Name }}-kafka-bootstrap:{{ .Values.kafka.ports.plain }}
database.history.kafka.topic: "schema-changes"
schema.history.internal.kafka.bootstrap.servers: {{ .Values.kafka.authentication.security_protocol }}://{{ .Release.Name }}-kafka-bootstrap:{{ .Values.kafka.ports.plain }}
schema.history.internal.kafka.topic: "history-changes"
database.history.store.only.captured.tables.ddl: "true"
include.schema.changes: "false"
topic.prefix: "next"
snapshot.mode: "when_needed"
log.mining.strategy: "online_catalog"
schema.history.internal.store.only.captured.tables.ddl: "true"
schema.history.internal.store.only.captured.databases.ddl: "true"
errors.log.include.messages: "true"
cdc.flattening.enabled: "true"
key.converter: "io.confluent.connect.avro.AvroConverter"
key.converter.schema.registry.url: "http://{{ .Values.schema_registry.name }}:{{ .Values.schema_registry.port }}"
key.converter.schema.registry.auto-register: "true"
key.converter.schema.registry.find-latest: "true"
value.converter: "io.confluent.connect.avro.AvroConverter"
value.converter.schema.registry.url: "http://{{ .Values.schema_registry.name }}:{{ .Values.schema_registry.port }}"
value.converter.schema.registry.auto-register: "true"
value.converter.schema.registry.find-latest: "true"
schema.name.adjustment.mode: "avro"
signal.data.collection: "<PREFIX>.SFNLNK.DEBEZIUM_SIGNAL"
transforms: "changes,unwrap"
transforms.changes.type: "io.debezium.transforms.ExtractChangedRecordState"
transforms.changes.header.changed.name: "Changed"
transforms.changes.header.unchanged.name: "Unchanged"
transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"
transforms.unwrap.drop.tombstones: "true"
transforms.unwrap.delete.handling.mode: "rewrite"
transforms.unwrap.add.fields: "op"
incremental.snapshot.chunk.size: "262144"
max.batch.size: "16384"
max.queue.size: "65536"
snapshot.max.threads: "1"
topic.creation.default.replication.factor: "1"
topic.creation.default.partitions: "1"
topic.creation.default.cleanup.policy: "compact"
topic.creation.default.compression.type: "lz4"
consumer.sasl.mechanism: {{ upper .Values.kafka.authentication.type }}
consumer.security.protocol: {{ .Values.kafka.authentication.security_protocol }}
producer.sasl.mechanism: {{ upper .Values.kafka.authentication.type }}
producer.security.protocol: {{ .Values.kafka.authentication.security_protocol }}
```
What I’ve Verified:
KafkaConnect
internal topics are created using the same credentialsKafkaConnector
starts but fails with metadata timeout errorsQuestion:
Why does the KafkaConnector
fail to create its required topics (like schema-changes
, history-changes
, or the table change topics), while the KafkaConnect
instance is able to create its internal topics using the same authentication and configuration?
Any idea what I might be missing? Could it be Avro converter config, network/SASL misconfig, or something with topic authorization?
Any help is greatly appreciated.
Kafka Connect itself can create required topics even if auto creation is disabled. The part you need to compare the connector and the Connect is authentication. You may be missing some specific configs for debezium connectors.
I compared your config with our debezium connectors and give you these:
"schema.history.internal.consumer.sasl.mechanism": "PLAIN",
"schema.history.internal.consumer.sasl.jaas.config": "${file:/kafka/vty/pass.properties:sasl}",
"schema.history.internal.consumer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.producer.sasl.mechanism": "PLAIN",
"schema.history.internal.producer.sasl.jaas.config": "${file:/kafka/vty/pass.properties:sasl}",
"schema.history.internal.producer.security.protocol": "SASL_PLAINTEXT",
"schema.history.internal.kafka.bootstrap.servers": "http://dbDstdkaf01.vakifbank.intra:9072"
You need to declare security protocol for history topic too. And since the connector is going to produce and consume data from history topic, it will need configurations as producer and consumer, both.
I give the jaas config from file because of security concerns, you may change that of course. I think this is your exact problem since I faced a similar issue before.