exceptionapache-kafkaapache-kafka-streamsexceptionhandleruncaughtexceptionhandler

Kafka StreamsUncaughtExceptionHandler REPLACE_THREAD vs SHUTDOWN_CLIENT


I have an old topic which contains corrupted messages and I need to fully reprocess this topic ignoring the unprocessable messages. Which is the right uncaught exception handling strategy for this scenario?

I'm not able to fully understand the differences between REPLACE_THREAD and SHUTDOWN_CLIENT strategies for uncaught exception handling on a KStreams application. Any explanations or references are appreciated.


Solution

  • Since I've been confused as well in the past so I did some research and best I've found is something in the error-handling tutorials confluent offers on their site.

    REPLACE_THREAD - Replaces the thread receiving the exception and processing continues with the same number of configured threads. (Note: this can result in duplicate records depending on the application’s processing mode determined by the PROCESSING_GUARANTEE_CONFIG value)

    SHUTDOWN_CLIENT - Shut down the individual instance of the Kafka Streams application experiencing the exception. (This is the previous behavior and the current default behavior if you don’t provide a StreamsUncaughtExceptionHandler)

    SHUTDOWN_APPLICATION - Shut down all instances of a Kafka Streams application with the same application-id. Kafka Streams uses a rebalance to instruct all application instances to shutdown, so even those running on another machine will receive the signal and exit.

    find further details here https://developer.confluent.io/tutorials/error-handling/confluent.html. You'll find one more mode there which is SHUTDOWN_APPLICATION. But my advice is to solve that differently.

    I faced similar issues in the past with corrupt data. I remember XML messages in my JSON topic and manually inserted test data which didn't satisfy any schema ever appointed which was leading to crashing applications multiple times a day. I can recommend following two solutions bringing more stability to your kstreams application if you want it to survive in case of corrupt consumptions:

    create a bad record filter

    after initializing you kafka stream in your code you'll have some of your transformations executed by map functions similar to the example here

    KStream<String, String> inputStream = builder.stream("input-topic");
    KStream<String, String> firstMappedStream = inputStream
      .map((key, value) -> { return <your-transformed-message>;}
    );
    

    before mapping and transformation try a dedicated filter or mapper which is doing all the safety logic. the next map will get cleansed records only. all the corrupt records will be discarded in the map / filter before. you can implement a log as well if you want to. solution would be something like the following

    KStream<String, String> inputStream = builder.stream("input-topic");
    KStream<String, String> firstMappedStream = inputStream
      .filter((key, value) -> { return <your-validation-result>})
      .map((key, value) -> { return <your-transformed-message>;}
    );
    
    stabilize your transformation

    since you can't think of every possible way of corruption an additional use of exception handling in your mappers is a good fit to build stable applications. my advice is to surround your mapped logic in a try-catch block and let the map function return null for the corrupt records. that way they won't be passed to the next map function and your application will be running stable no matter what.

    hope that helps