apache-kafkadebeziumcdc

Unable to move binlog filename offset from kafka topic


When Debezium was last running, it committed a binlog message at offset 0 of the offsets' topic.

After that, I ran the following command 4 times:

$ echo '[ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] |  { "transaction_id": null, "ts_sec": , "file": "mysql_bin_log.000047", "pos": 0, "row": 0, "server_id": 1, "event": 31 }' | kcat -P -b localhost:9092  -t debezium-events-offset-local-topic-events2 -K \| -p 0
$ echo '[ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] |  { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.000047", "pos": 0, "row": 0, "server_id": 1, "event": 31 }' | kcat -P -b localhost:9092  -t debezium-events-offset-local-topic-events2 -K \| -p 0
$ echo '[ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] |  { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.002376", "pos": 6150164, "row": 148, "server_id": 1, "event": 12 }' | kcat -P -b localhost:9092  -t debezium-events-offset-local-topic-events2 -K \| -p 0
$ echo '[ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] |  { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.002375", "pos": 6150164, "row": 148, "server_id": 1, "event": 12 }' | kcat -P -b localhost:9092  -t debezium-events-offset-local-topic-events2 -K \| -p 0

These are the messages in the topic:

$ kcat -b localhost:9092 -C -t debezium-events-offset-local-topic-events2 -f 'Partition(%p) %k %s\n'
Partition(0) ["debezium_mysql_connector",{"server":"debezium-cdc-events2"}] {"transaction_id":null,"ts_sec":1697625116,"file":"mysql_bin_log.002376","pos":6150164,"row":148,"server_id":1,"event":12} <-- USES THIS
Partition(0) [ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ]    { "transaction_id": null, "ts_sec": 1697532150, "file": "mysql_bin_log.000047", "pos": 0, "row": 0, "server_id": 1, "event": 31 }
Partition(0) [ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ]    { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.000047", "pos": 0, "row": 0, "server_id": 1, "event": 31 }
Partition(0) [ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ]    { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.002376", "pos": 6150164, "row": 148, "server_id": 1, "event": 12 }
Partition(0) [ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ]    { "transaction_id": null, "ts_sec": 1697641096, "file": "mysql_bin_log.002375", "pos": 6150164, "row": 148, "server_id": 1, "event": 12 } <-- SHOULD USE THIS. RIGHT?
% Reached end of topic debezium-events-offset-local-topic-events2 [0] at offset 5

Now, when Debezium is restarted, my understanding is that it should read the last message in the topic for figuring out the binlog filename.

However, when I restart Debezium, it seems to read all messages but uses the first message.

Log after restarting Debezium:

org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.4.1
org.apache.kafka.clients.Metadata - [Consumer clientId=debezium-serveroffsets, groupId=null] Cluster ID: MkU3OEVBNTcwNTJENDM2Qg
org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=debezium-serveroffsets, groupId=null] Assigned to partition(s): debezium-events-offset-local-topic-events2-0
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=debezium-serveroffsets, groupId=null] Seeking to earliest offset of partition debezium-events-offset-local-topic-events2-0
org.apache.kafka.connect.util.KafkaBasedLog - Reading to end of offset log
org.apache.kafka.connect.util.TopicAdmin - endOffsets(): partitions=[debezium-events-offset-local-topic-events2-0]
org.apache.kafka.connect.util.TopicAdmin - endOffsets(): offsetSpecMap={debezium-events-offset-local-topic-events2-0=org.apache.kafka.clients.admin.OffsetSpec$LatestSpec@52af8aca}
org.apache.kafka.connect.util.KafkaBasedLog - Reading to end of log offsets {debezium-events-offset-local-topic-events2-0=5}
org.apache.kafka.clients.Metadata - [Consumer clientId=debezium-serveroffsets, groupId=null] Resetting the last seen epoch of partition debezium-events-offset-local-topic-events2-0 to 0 since the associated topicId changed from null to rbKsBL63QcSrg722GNjjlg
org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=debezium-serveroffsets, groupId=null] Resetting offset for partition debezium-events-offset-local-topic-events2-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}}.
org.apache.kafka.connect.util.KafkaBasedLog - Behind end offset 5 for debezium-events-offset-local-topic-events2-0; last-read offset is 0
org.apache.kafka.connect.util.KafkaBasedLog - Read to end offset 5 for debezium-events-offset-local-topic-events2-0
org.apache.kafka.connect.util.KafkaBasedLog - Finished reading KafkaBasedLog for topic debezium-events-offset-local-topic-events2
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO  org.apache.kafka.connect.util.KafkaBasedLog - Thread[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2,5,main] started execution
org.apache.kafka.connect.util.KafkaBasedLog - Started KafkaBasedLog for topic debezium-events-offset-local-topic-events2
org.apache.kafka.connect.storage.KafkaOffsetBackingStore - Finished reading offsets topic and starting KafkaOffsetBackingStore
io.debezium.connector.common.BaseSourceTask - getPreviousOffsets(): partitions=[MySqlPartition [sourcePartition={server=debezium-cdc-events2}]]
org.apache.kafka.connect.util.KafkaBasedLog - Starting read to end log for topic debezium-events-offset-local-topic-events2
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO  org.apache.kafka.connect.util.KafkaBasedLog - Reading to end of offset log
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO  org.apache.kafka.connect.util.TopicAdmin - endOffsets(): partitions=[debezium-events-offset-local-topic-events2-0]
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO  org.apache.kafka.connect.util.TopicAdmin - endOffsets(): offsetSpecMap={debezium-events-offset-local-topic-events2-0=org.apache.kafka.clients.admin.OffsetSpec$LatestSpec@3e7ebdb3}
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO  org.apache.kafka.connect.util.KafkaBasedLog - Reading to end of log offsets {debezium-events-offset-local-topic-events2-0=5}
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO  org.apache.kafka.connect.util.KafkaBasedLog - Read to end offset 5 for debezium-events-offset-local-topic-events2-0
[KafkaBasedLog Work Thread - debezium-events-offset-local-topic-events2] INFO  org.apache.kafka.connect.util.KafkaBasedLog - Finished read to end log for topic debezium-events-offset-local-topic-events2
io.debezium.connector.common.BaseSourceTask - getPreviousOffsets(): offsets={MySqlPartition [sourcePartition={server=debezium-cdc-events2}]=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql_bin_log.002376, currentBinlogPosition=6150164, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql_bin_log.002376, restartBinlogPosition=6150164, restartRowsToSkip=148, restartEventsToSkip=12, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]}
io.debezium.connector.common.BaseSourceTask - getPreviousOffsets(): Considering offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql_bin_log.002376, currentBinlogPosition=6150164, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql_bin_log.002376, restartBinlogPosition=6150164, restartRowsToSkip=148, restartEventsToSkip=12, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]
io.debezium.connector.common.BaseSourceTask - Found previous partition offset MySqlPartition [sourcePartition={server=debezium-cdc-events2}]: {transaction_id=null, file=mysql_bin_log.002376, pos=6150164, row=148, event=12}

Notice the 10th line in the log:

last-read offset is 0

Do you know what I am doing wrong? How do I move the last-read offset? We are using Debezium 2.4.0.Final with MySQL. Thanks for your time!

Thanks for your time!

UPDATE 1:

I see this log:

org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=debezium-serveroffsets, groupId=null] Resetting offset for partition debezium-events-offset-local-topic-events2-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker:29092 (id: 1 rack: null)], epoch=0}}.

Looks like offset=0 is being saved somewhere. How would I move the offset by using kcat or using the scripts in the kafka repo?


Solution

  • The whitespaces were causing issues.

    This command is causing issues as it has spaces in the json:

    $ echo '[ "debezium_mysql_connector", { "server": "debezium-cdc-events2" } ] |  { "transaction_id": null, "ts_sec": , "file": "mysql_bin_log.000047", "pos": 0, "row": 0, "server_id": 1, "event": 31 }' | kcat -P -b localhost:9092  -t debezium-events-offset-local-topic-events2 -K \| -p 0
    

    When I remove all the spaces in the json, Debezium is able to successfully reset to that offset. Here's a sample command that worked:

    $ echo '["debezium_mysql_connector",{"server":"debezium-cdc-events2"}]|{"transaction_id":null,"ts_sec":1697641096,"file":"mysql_bin_log.002375","pos":0,"row":0,"server_id":1,"event":12}' | kcat -P -b localhost:9092  -t debezium-events-offset-local-topic-events2 -K \| -p 0