There are so few examples of different methods of error-handling in spring cloud streams, and the few that are provided partially via the documentation don't seem to work either.
I have a test repository with multiple methods of error capture attempted, and none of the methods work in any way.
Spring Cloud Streams has reliable deserialization and serialization error handling, but error handling from map, transform and processor methods are very under-documented.
Repository for samples: https://github.com/StevenPG/scs-experimentation/tree/main/scs4-error-handling/error-handling
I have only two main files
@SpringBootApplication
public class ErrorHandlingApplication {
public final Random randomNumberGenerator = new Random(System.currentTimeMillis());
public static void main(String[] args) {
SpringApplication.run(ErrorHandlingApplication.class, args);
}
@Bean
public Supplier<Message<String>> randomIntegerPublisher() {
return () -> MessageBuilder
.withPayload(String.valueOf(randomNumberGenerator.nextInt()))
.setHeader(KafkaHeaders.RECEIVED_KEY, 0)
.build();
}
@Bean
public Consumer<KStream<String, String>> errorStream() {
return input -> input
// Remove odd numbers so we throw an exception on every other message
.map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
.filter((key, value) -> (value & 1) == 0)
.map((key, value) -> {
throw new RuntimeException("Pushing uncaught error to kill stream!");
}
);
}
@Bean
public Consumer<KStream<String, String>> errorHandledStream() {
return input -> input
// Remove odd numbers so we throw an exception on ever other message
.map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
.filter((key, value) -> (value & 1) == 0)
.map((key, value) -> {
System.out.println("This should not kill the stream");
throw new RuntimeException("Publishing error to be caught!");
}
);
}
@Bean
// TODO - doesn't seem to be working, is this because we're using kstreams?
public Consumer<ErrorMessage> defaultErrorHandler() {
return v -> {
System.out.println("Caught and handling error");
System.out.println(v.toString());
};
}
@Bean
// TODO - not working via the config
/**
* bindings:
* errorHandledStream-in-0:
* consumer:
* commonErrorHandlerBeanName: defaultCommonErrorHandler
*/
public CommonErrorHandler defaultCommonErrorHandler() {
return new CommonLoggingErrorHandler();
}
/**
* Also not working
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(defaultCommonErrorHandler());
return factory;
}
}
and
spring:
cloud:
function:
definition: randomIntegerPublisher;errorStream;errorHandledStream;defaultErrorHandler
stream:
default:
error-handler-definition: defaultErrorHandler
kafka:
streams:
binder:
deserialization-exception-handler: logandcontinue
bindings:
errorHandledStream-in-0:
error-handler-definition: defaultErrorHandler
consumer:
commonErrorHandlerBeanName: defaultCommonErrorHandler
bindings:
errorHandledStream-in-0:
consumer:
commonErrorHandlerBeanName: defaultCommonErrorHandler
bindings:
randomIntegerPublisher-out-0:
destination: integer-topic
errorStream-in-0:
destination: integer-topic
errorHandledStream-in-0:
destination: integer-topic
error-handler-definition: defaultErrorHandler
Pretty much every documented variation of error handling does not seem to function correctly.
My first stream, errorStream acts as expected. Killing the relevant consumer (although the global configs should catch this).
The second stream, errorHandledStream attempts to have config provided that catches the error.
The primary ask, is when exceptions occur within the map method (for this example), to be able to have some exception handler perform an action so that the stream does not crash and restart.
This is all with the latest spring-cloud-streams versions, and the following dependencies.
extra["springCloudVersion"] = "2022.0.3"
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")
The following references were used:
What am I missing here, and/or what reference can I use to review and implementation. OR, is there a working example posted anywhere (or can be provided here) to use as a starting point?
The capability requested here is being added as part of https://github.com/spring-cloud/spring-cloud-stream/issues/2779.
This is accomplished by using https://github.com/spring-cloud/spring-cloud-stream/blob/main/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/DltAwareProcessor.java