we are running Debezium standalone server to capture the changes from postgresql database, and when a change is capture, we want it to send to a http endpoint. It works perfectly fine, however I can't figure out a way to disable the schema for key for the message payload. Here is the configuration
# source configuration
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000
debezium.source.database.hostname=db.host.com
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=helloworld
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=test-prefix
debezium.source.plugin.name=pgoutput
debezium.format.value=json
debezium.format.key.converter=org.apache.kafka.connect.json.JsonConverter
debezium.format.value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
#debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=public
quarkus.log.console.json=false
# sink configuration
debezium.sink.type=http
debezium.sink.http.url=http://localhost:3000/endpoint
debezium.sink.http.timeout.ms=30000
debezium.sink.http.retry.interval.ms=30000
Logs:
Powered by Quarkus 3.2.12.Final
2024-07-01 11:52:18,516 INFO [io.deb.ser.BaseChangeConsumer] (main) Using 'io.debezium.server.BaseChangeConsumer$$Lambda$283/0x0000000800f69bf0@4cb702ce' stream name mapper
2024-07-01 11:52:18,550 INFO [io.deb.ser.htt.HttpChangeConsumer] (main) Using http content-type type application/json
2024-07-01 11:52:18,551 INFO [io.deb.ser.htt.HttpChangeConsumer] (main) Using sink URL: http://localhost:3000/endpoint
2024-07-01 11:52:18,551 INFO [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.http.HttpChangeConsumer' instantiated
2024-07-01 11:52:18,588 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = header
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = true
2024-07-01 11:52:18,589 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = true
2024-07-01 11:52:18,589 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = true
2024-07-01 11:52:18,605 INFO [io.deb.emb.EmbeddedWorkerConfig] (main) EmbeddedWorkerConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
auto.include.jmx.reporter = true
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
config.providers = []
connector.client.config.override.policy = All
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = [http://:8083]
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 60000
offset.flush.timeout.ms = 5000
offset.storage.file.filename = data/offsets.dat
offset.storage.partitions = null
offset.storage.replication.factor = null
offset.storage.topic =
plugin.discovery = hybrid_warn
plugin.path = null
response.http.headers.config =
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
ssl.cipher.suites = null
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
task.shutdown.graceful.timeout.ms = 5000
topic.creation.enable = true
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
2024-07-01 11:52:18,608 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = false
2024-07-01 11:52:18,609 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = false
I have also tried prefixing it with debezium.format.
, but the message received at the http server still contains the schema.
The settings are:
debezium.source.key.converter.schemas.enable=false
debezium.source.value.converter.schemas.enable=false