apache-kafka-streamsconfluent-schema-registryktable

KafkaStreams, Error Registering Avro Schema


Disclaimer: My experience with KafkaStreams is quite limited. I do not quite understand why am I getting a org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: and Schema being registered is incompatible with an earlier schema; error when all I'm doing is Streaming a Topic into a KTable so I can later use Interactive Queries on that Store.

here's the SerDes config.

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);

Streams code: Note I do not wish to do any filtering or grouping with the Stream at this point, I just want the data to be available for future querying over a Store.

final KStream<String,GenericRecord> stream = builder.stream("my-topic");
stream.toTable(Materialized.as("my-state-store"));

whenever I streams.start() I keep getting those Exceptions whilst Serialising. I've got a schema for this so I've used the SpecificAvroSerdes but the problem is the same. I guess I am missing some fundamental understanding on why my KTable is attempting to register a new Schema with Confluent.

Edit 1: I now understand the role being played by the schema register here. Using the KStream with a GenericAvroSerde I can consume the data from the Topic however still not being able to Materialize it in the KTable. My questions now are:

  1. Why do I keep getting the above exception always in the same partition and offset, even thought I am not calling streams.cleanUp(). Why isn't it moving on (committing).
  2. This exception seems to be un-recoverable. All the Streams threads die bringing the app down. Is there a way to circumvent this? Note: I'm already using the LogAndContinue exception handlers for Production and Deserialization.

Edit 2:

I was able to overcome that exception. My StateStore contained previous entries with an incompatible schema. After I Purged the Topic and changed the ApplicationId it started to work.

This still doesn't negate the need to trap the Schema being registered is incompatible with an earlier schema; Exception though. This brings the Streams Application to a halt. I have tried usingstreams.setUncaughtExceptionHandler where I'm able to log the Error however this doesn't prevent the Streams Threads to die and I can't even start them after that. Surely there's a way to trap this?


Solution

  • The exception you describe is fatal and you cannot prevent the thread from dying atm, because the thread cannot make any progress and thus gives up. Only after you fixed the issue you can restart the application (as you observed).

    Btw: why do you not simply use builder.table() to read the topic?