When Debezium was last running, it committed a binlog message at offset 0 of the offsets' topic.
After that, I ran the following command 4 times:
$ echo '[ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] | { "transaction_id": null, "ts_sec": , "file": "mysql_bin_log.000047", "pos": 0, "row": 0, "server_id": 1, "event": 31 }' | kcat -P -b localhost:9092 -t debezium-events-offset-local-topic-events2 -K \| -p 0
$ echo '[ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] | { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.000047", "pos": 0, "row": 0, "server_id": 1, "event": 31 }' | kcat -P -b localhost:9092 -t debezium-events-offset-local-topic-events2 -K \| -p 0
$ echo '[ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] | { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.002376", "pos": 6150164, "row": 148, "server_id": 1, "event": 12 }' | kcat -P -b localhost:9092 -t debezium-events-offset-local-topic-events2 -K \| -p 0
$ echo '[ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] | { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.002375", "pos": 6150164, "row": 148, "server_id": 1, "event": 12 }' | kcat -P -b localhost:9092 -t debezium-events-offset-local-topic-events2 -K \| -p 0
These are the messages in the topic:
$ kcat -b localhost:9092 -C -t debezium-events-offset-local-topic-events2 -f 'Partition(%p) %k %s\n'
Partition(0) ["debezium_mysql_connector",{"server":"debezium-cdc-events2"}] {"transaction_id":null,"ts_sec":1697625116,"file":"mysql_bin_log.002376","pos":6150164,"row":148,"server_id":1,"event":12} <-- USES THIS
Partition(0) [ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] { "transaction_id": null, "ts_sec": 1697532150, "file": "mysql_bin_log.000047", "pos": 0, "row": 0, "server_id": 1, "event": 31 }
Partition(0) [ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.000047", "pos": 0, "row": 0, "server_id": 1, "event": 31 }
Partition(0) [ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.002376", "pos": 6150164, "row": 148, "server_id": 1, "event": 12 }
Partition(0) [ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.002375", "pos": 6150164, "row": 148, "server_id": 1, "event": 12 } <-- SHOULD USE THIS. RIGHT?
% Reached end of topic debezium-events-offset-local-topic-events2 [0] at offset 5
Now, when Debezium is restarted, my understanding is that it should read the last message in the topic for figuring out the binlog filename.
However, when I restart Debezium, it seems to read all messages but uses the first message.
Log after restarting Debezium:
org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.4.1
org.apache.kafka.clients.Metadata - [Consumer clientId=debezium-serveroffsets, groupId=null] Cluster ID: MkU3OEVBNTcwNTJENDM2Qg
org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=debezium-serveroffsets, groupId=null] Assigned to partition(s): debezium-events-offset-local-topic-events2-0
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=debezium-serveroffsets, groupId=null] Seeking to earliest offset of partition debezium-events-offset-local-topic-events2-0
org.apache.kafka.connect.util.KafkaBasedLog - Reading to end of offset log
org.apache.kafka.connect.util.TopicAdmin - endOffsets(): partitions=[debezium-events-offset-local-topic-events2-0]
org.apache.kafka.connect.util.TopicAdmin - endOffsets(): offsetSpecMap={debezium-events-offset-local-topic-events2-0=org.apache.kafka.clients.admin.OffsetSpec$LatestSpec@52af8aca}
org.apache.kafka.connect.util.KafkaBasedLog - Reading to end of log offsets {debezium-events-offset-local-topic-events2-0=5}
org.apache.kafka.clients.Metadata - [Consumer clientId=debezium-serveroffsets, groupId=null] Resetting the last seen epoch of partition debezium-events-offset-local-topic-events2-0 to 0 since the associated topicId changed from null to rbKsBL63QcSrg722GNjjlg
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=debezium-serveroffsets, groupId=null] Resetting offset for partition debezium-events-offset-local-topic-events2-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}}.
org.apache.kafka.connect.util.KafkaBasedLog - Behind end offset 5 for debezium-events-offset-local-topic-events2-0; last-read offset is 0
org.apache.kafka.connect.util.KafkaBasedLog - Read to end offset 5 for debezium-events-offset-local-topic-events2-0
org.apache.kafka.connect.util.KafkaBasedLog - Finished reading KafkaBasedLog for topic debezium-events-offset-local-topic-events2
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO org.apache.kafka.connect.util.KafkaBasedLog - Thread[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2,5,main] started execution
org.apache.kafka.connect.util.KafkaBasedLog - Started KafkaBasedLog for topic debezium-events-offset-local-topic-events2
org.apache.kafka.connect.storage.KafkaOffsetBackingStore - Finished reading offsets topic and starting KafkaOffsetBackingStore
io.debezium.connector.common.BaseSourceTask - getPreviousOffsets(): partitions=[MySqlPartition [sourcePartition={server=debezium-cdc-events2}]]
org.apache.kafka.connect.util.KafkaBasedLog - Starting read to end log for topic debezium-events-offset-local-topic-events2
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO org.apache.kafka.connect.util.KafkaBasedLog - Reading to end of offset log
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO org.apache.kafka.connect.util.TopicAdmin - endOffsets(): partitions=[debezium-events-offset-local-topic-events2-0]
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO org.apache.kafka.connect.util.TopicAdmin - endOffsets(): offsetSpecMap={debezium-events-offset-local-topic-events2-0=org.apache.kafka.clients.admin.OffsetSpec$LatestSpec@3e7ebdb3}
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO org.apache.kafka.connect.util.KafkaBasedLog - Reading to end of log offsets {debezium-events-offset-local-topic-events2-0=5}
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO org.apache.kafka.connect.util.KafkaBasedLog - Read to end offset 5 for debezium-events-offset-local-topic-events2-0
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO org.apache.kafka.connect.util.KafkaBasedLog - Finished read to end log for topic debezium-events-offset-local-topic-events2
io.debezium.connector.common.BaseSourceTask - getPreviousOffsets(): offsets={MySqlPartition [sourcePartition={server=debezium-cdc-events2}]=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql_bin_log.002376, currentBinlogPosition=6150164, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql_bin_log.002376, restartBinlogPosition=6150164, restartRowsToSkip=148, restartEventsToSkip=12, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]}
io.debezium.connector.common.BaseSourceTask - getPreviousOffsets(): Considering offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql_bin_log.002376, currentBinlogPosition=6150164, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql_bin_log.002376, restartBinlogPosition=6150164, restartRowsToSkip=148, restartEventsToSkip=12, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
io.debezium.connector.common.BaseSourceTask - Found previous partition offset MySqlPartition [sourcePartition={server=debezium-cdc-events2}]: {transaction_id=null, file=mysql_bin_log.002376, pos=6150164, row=148, event=12}
Notice the 10th line in the log:
last-read offset is 0
Do you know what I am doing wrong? How do I move the last-read offset? We are using Debezium 2.4.0.Final with MySQL. Thanks for your time!
Thanks for your time!
UPDATE 1:
I see this log:
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=debezium-serveroffsets, groupId=null] Resetting offset for partition debezium-events-offset-local-topic-events2-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}}.
Looks like offset=0 is being saved somewhere. How would I move the offset by using kcat or using the scripts in the kafka repo?
The whitespaces were causing issues.
This command is causing issues as it has spaces in the json:
$ echo '[ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] | { "transaction_id": null, "ts_sec": , "file": "mysql_bin_log.000047", "pos": 0, "row": 0, "server_id": 1, "event": 31 }' | kcat -P -b localhost:9092 -t debezium-events-offset-local-topic-events2 -K \| -p 0
When I remove all the spaces in the json, Debezium is able to successfully reset to that offset. Here's a sample command that worked:
$ echo '["debezium_mysql_connector",{"server":"debezium-cdc-events2"}]|{"transaction_id":null,"ts_sec":1697641096,"file":"mysql_bin_log.002375","pos":0,"row":0,"server_id":1,"event":12}' | kcat -P -b localhost:9092 -t debezium-events-offset-local-topic-events2 -K \| -p 0