Trying to set up schema transfer SMT (https://github.com/OneCricketeer/schema-registry-transfer-smt) with MM2.
The first iteration works successfully. A schema is created in the target registry and the messages in the topic are displayed correctly.
But then replication stops, new messages stop coming to the target cluster.
After disabling SMT schema transfer, message replication starts working again, but as expected without SMT, the consumer tries to deserialize events with the source schema ID without success.
We are on Kafka 2.8/Confluent 6.0 and using MM2 for A->B one-way replication.
MirrorMaker 2 settings
/etc/kafka/connect-mirror-maker.properties:
clusters=source, target-stage
source.bootstrap.servers=broker-src.net:9091
target-stage.bootstrap.servers=broker-target:9091
source->target-stage.enabled=True
source->target-stage.topics=test-mm-.*
topics.blacklist=.*[\\-\\.]internal, .*\\.replica, __.*
target-stage->source.enabled=False
replication.policy.class=com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
replication.factor=1
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
source->target-stage.transforms=avroSchemaTransfer
source->target-stage.transforms.avroSchemaTransfer.transfer.message.keys=false
source->target-stage.transforms.avroSchemaTransfer.src.schema.registry.url=http://schema-reg-src.net:8081
source->target-stage.transforms.avroSchemaTransfer.src.basic.auth.credentials.source=USER_INFO
source->target-stage.transforms.avroSchemaTransfer.src.basic.auth.user.info=user:pass
source->target-stage.transforms.avroSchemaTransfer.dest.schema.registry.url=http://schema-reg-target.net:8081
source->target-stage.transforms.avroSchemaTransfer.dest.basic.auth.credentials.source=USER_INFO
source->target-stage.transforms.avroSchemaTransfer.dest.basic.auth.user.info=user:pass
source->target-stage.transforms.avroSchemaTransfer.type=cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer
The logs show these errors:
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Caused by: cricket.jmoore.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema 1 not found; error code: 40403
I can’t figure out what to do with the magic byte and who needed Schema 1?
MirrorMaker 2 logs slice
/var/log/kafka/connect-mirror-maker.log:
[2022-12-29 15:50:17,618] INFO Initializing: org.apache.kafka.connect.runtime.TransformationChain{cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer} (org.apache.kafka.connect.runtime.Worker:606)
[2022-12-29 15:50:18,365] INFO WorkerSourceTask{id=MirrorSourceConnector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2022-12-29 15:50:18,438] INFO [Consumer clientId=consumer-null-12, groupId=null] Cluster ID: tIsZUjsuRvm3HYsWU0lUsA (org.apache.kafka.clients.Metadata:279)
[2022-12-29 15:50:18,536] INFO WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 15:50:18,536] INFO WorkerSourceTask{id=MirrorSourceConnector-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 15:50:18,537] ERROR WorkerSourceTask{id=MirrorSourceConnector-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:341)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2022-12-29 15:50:18,601] ERROR WorkerSourceTask{id=MirrorSourceConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:341)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2022-12-29 15:50:18,602] ERROR WorkerSourceTask{id=MirrorSourceConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:192)
[2022-12-29 15:50:18,604] INFO [Producer clientId=producer-9] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189)
[2022-12-29 15:50:18,608] INFO Stopping task-thread-MirrorSourceConnector-0 took 6 ms. (org.apache.kafka.connect.mirror.MirrorSourceTask:120)
[2022-12-29 15:50:18,608] INFO [Producer clientId=target-stage-producer] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189)
[2022-12-29 15:50:22,701] ERROR Unable to fetch schema id 1 in source registry for record value (cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer:159)
[2022-12-29 15:50:22,701] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 15:50:22,706] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 15:50:22,706] ERROR WorkerSourceTask{id=MirrorHeartbeatConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:341)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2022-12-29 15:50:18,602] ERROR WorkerSourceTask{id=MirrorSourceConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:192)
[2022-12-29 15:50:18,604] INFO [Producer clientId=producer-9] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189)
[2022-12-29 15:50:18,608] INFO Stopping task-thread-MirrorSourceConnector-0 took 6 ms. (org.apache.kafka.connect.mirror.MirrorSourceTask:120)
[2022-12-29 15:50:18,608] INFO [Producer clientId=target-stage-producer] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189)
[2022-12-29 15:50:22,701] ERROR Unable to fetch schema id 1 in source registry for record value (cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer:159)
[2022-12-29 15:50:22,701] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 15:50:22,706] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 15:50:22,706] ERROR WorkerSourceTask{id=MirrorHeartbeatConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:341)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: cricket.jmoore.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema 1 not found; error code: 40403
at cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer.translateRegistrySchema(SchemaRegistryTransfer.java:163)
at cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer.updateKeyValue(SchemaRegistryTransfer.java:113)
at cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer.apply(SchemaRegistryTransfer.java:72)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 11 more
Caused by: cricket.jmoore.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema 1 not found; error code: 40403
at cricket.jmoore.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292)
at cricket.jmoore.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
at cricket.jmoore.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:660)
at cricket.jmoore.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:642)
at cricket.jmoore.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:225)
at cricket.jmoore.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:299)
at cricket.jmoore.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:284)
at cricket.jmoore.kafka.connect.transforms.SchemaRegistryTransfer.translateRegistrySchema(SchemaRegistryTransfer.java:157)
... 16 more
[2022-12-29 15:50:22,706] ERROR WorkerSourceTask{id=MirrorHeartbeatConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:192)
[2022-12-29 15:50:22,707] INFO [Producer clientId=target-stage-producer] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189)
[2022-12-29 15:51:06,669] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 15:51:06,670] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 15:51:06,705] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 15:51:06,705] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 15:51:06,711] INFO Stopping task-thread-MirrorCheckpointConnector-0 took 6 ms. (org.apache.kafka.connect.mirror.MirrorCheckpointTask:98)
[2022-12-29 15:51:06,711] INFO [Producer clientId=target-stage-producer] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1189)
[2022-12-29 15:51:06,784] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets successfully in 115 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:586)
[2022-12-29 15:51:12,501] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 15:51:12,502] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 15:51:17,691] INFO WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 15:51:17,691] INFO WorkerSourceTask{id=MirrorSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 15:51:17,713] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 15:51:17,714] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 17:00:07,890] INFO Found 0 new topic-partitions on source. Found 0 deleted topic-partitions on source. Found 97 topic-partitions missing on target-stage.
(org.apache.kafka.connect.mirror.MirrorSourceConnector:241)
[2022-12-29 17:10:07,976] INFO Found 0 new topic-partitions on source. Found 0 deleted topic-partitions on source. Found 97 topic-partitions missing on target-stage.
(org.apache.kafka.connect.mirror.MirrorSourceConnector:241)
[2022-12-29 17:16:07,140] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 17:16:07,140] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 17:16:07,142] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets successfully in 2 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:586)
[2022-12-29 17:16:12,556] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 17:16:12,556] INFO WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 17:16:17,831] INFO WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 17:16:17,832] INFO WorkerSourceTask{id=MirrorSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 17:16:17,833] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 17:16:17,833] INFO WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
[2022-12-29 17:16:17,833] INFO WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:488)
[2022-12-29 17:16:17,833] INFO WorkerSourceTask{id=MirrorSourceConnector-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)
Thanks for using my SMT.
The unknown byte error will occur if someone doesn't send a valid payload into your topic.
The schema not found error can only happen on the source registry
, as the error says, since it's copying from there.
After disabling the SMT schema transfer, message replication starts working again
Okay, but can you deserialize those events with a consumer successfully?