postgresqldebeziumcdcdebezium-enginedebezium-server

debezium standalone server with http sink


we are running Debezium standalone server to capture the changes from postgresql database, and when a change is capture, we want it to send to a http endpoint. It works perfectly fine, however I can't figure out a way to disable the schema for key for the message payload. Here is the configuration

# source configuration
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000
debezium.source.database.hostname=db.host.com
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=helloworld
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=test-prefix
debezium.source.plugin.name=pgoutput


debezium.format.value=json
debezium.format.key.converter=org.apache.kafka.connect.json.JsonConverter
debezium.format.value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

#debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=public
quarkus.log.console.json=false

# sink configuration
debezium.sink.type=http
debezium.sink.http.url=http://localhost:3000/endpoint
debezium.sink.http.timeout.ms=30000
debezium.sink.http.retry.interval.ms=30000

Logs:


                      Powered by Quarkus 3.2.12.Final
2024-07-01 11:52:18,516 INFO  [io.deb.ser.BaseChangeConsumer] (main) Using 'io.debezium.server.BaseChangeConsumer$$Lambda$283/0x0000000800f69bf0@4cb702ce' stream name mapper
2024-07-01 11:52:18,550 INFO  [io.deb.ser.htt.HttpChangeConsumer] (main) Using http content-type type application/json
2024-07-01 11:52:18,551 INFO  [io.deb.ser.htt.HttpChangeConsumer] (main) Using sink URL: http://localhost:3000/endpoint
2024-07-01 11:52:18,551 INFO  [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.http.HttpChangeConsumer' instantiated
2024-07-01 11:52:18,588 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = header
    decimal.format = BASE64
    replace.null.with.default = true
    schemas.cache.size = 1000
    schemas.enable = true

2024-07-01 11:52:18,589 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = key
    decimal.format = BASE64
    replace.null.with.default = true
    schemas.cache.size = 1000
    schemas.enable = true

2024-07-01 11:52:18,589 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = value
    decimal.format = BASE64
    replace.null.with.default = true
    schemas.cache.size = 1000
    schemas.enable = true

2024-07-01 11:52:18,605 INFO  [io.deb.emb.EmbeddedWorkerConfig] (main) EmbeddedWorkerConfig values:
    access.control.allow.methods =
    access.control.allow.origin =
    admin.listeners = null
    auto.include.jmx.reporter = true
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    config.providers = []
    connector.client.config.override.policy = All
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    listeners = [http://:8083]
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    offset.flush.interval.ms = 60000
    offset.flush.timeout.ms = 5000
    offset.storage.file.filename = data/offsets.dat
    offset.storage.partitions = null
    offset.storage.replication.factor = null
    offset.storage.topic =
    plugin.discovery = hybrid_warn
    plugin.path = null
    response.http.headers.config =
    rest.advertised.host.name = null
    rest.advertised.listener = null
    rest.advertised.port = null
    rest.extension.classes = []
    ssl.cipher.suites = null
    ssl.client.auth = none
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    task.shutdown.graceful.timeout.ms = 5000
    topic.creation.enable = true
    topic.tracking.allow.reset = true
    topic.tracking.enable = true
    value.converter = class org.apache.kafka.connect.json.JsonConverter

2024-07-01 11:52:18,608 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = key
    decimal.format = BASE64
    replace.null.with.default = true
    schemas.cache.size = 1000
    schemas.enable = false

2024-07-01 11:52:18,609 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = value
    decimal.format = BASE64
    replace.null.with.default = true
    schemas.cache.size = 1000
    schemas.enable = false

I have also tried prefixing it with debezium.format., but the message received at the http server still contains the schema.


Solution

  • The settings are:

    debezium.source.key.converter.schemas.enable=false
    debezium.source.value.converter.schemas.enable=false