databasejboss-toolsdebeziumapache-pulsar

Debezium error, schema isn't known to this connector


I have a project using Debezium, mostly based on this example, which is then connected to an Apache Pulsar.

I have changed a few configurations. The file now looks like this:

database.history=io.debezium.relational.history.MemoryDatabaseHistory
connector.class=io.debezium.connector.mysql.MySqlConnector
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=offset.dat
offset.flush.interval.ms=5000
name=mysql-dbz-connector
database.hostname={ip}
database.port=3308
database.user={user}
database.password={pass}
database.dbname=database
database.server.name=test
table.whitelist=database.history_table,database.project_table
snapshot.mode=schema_only
schemas.enable=false        
include.schema.changes=false
pulsar.topic=persistent://public/default/{0}
pulsar.broker.address=pulsar://{ip}:6650
database.history=io.debezium.relational.history.MemoryDatabaseHistory

As you may understand, what I'm trying to do is to monitor the history_table and the project_table modifications from the database and then write payloads onto an Apache Pulsar.

My problem is as follows. In whatever snapshot mode I use, when an offset has been written, I can't restart the Debezium without getting an error on the next database update.

Encountered change event for table database.history_table whose schema isn't known to this connector

It only happens with an existing offset.dat file. I think this is because the schema is null within the offset.dat file. Take this one for example:

¨Ìsrjava.util.HashMap⁄¡√`—F
loadFactorI thresholdxp?@wur[B¨Û¯T‡xpG{"schema":null,"payload":["mysql-dbz-connector",{"server":"test"}]}uq~U{"ts_sec":1563802215,"file":"database-bin.000005","pos":79574,"server_id":1,"event":1}x

I first suspected the schemas.enable=false or the include.schema.changes=false parameters that I used to make the JSON more concise, but their values don't change anything in the offset.dat file.


Solution

  • The problem lies in line database.history=io.debezium.relational.history.MemoryDatabaseHistory. The history will not survive restart. You should use FileDatabaseHistory instead of it.