spring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafkakafka-streams-binder

Spring Cloud Streams Error Handling Not Working


There are so few examples of different methods of error-handling in spring cloud streams, and the few that are provided partially via the documentation don't seem to work either.

I have a test repository with multiple methods of error capture attempted, and none of the methods work in any way.

Spring Cloud Streams has reliable deserialization and serialization error handling, but error handling from map, transform and processor methods are very under-documented.

Repository for samples: https://github.com/StevenPG/scs-experimentation/tree/main/scs4-error-handling/error-handling

I have only two main files

@SpringBootApplication
public class ErrorHandlingApplication {

    public final Random randomNumberGenerator = new Random(System.currentTimeMillis());

    public static void main(String[] args) {
        SpringApplication.run(ErrorHandlingApplication.class, args);
    }

    @Bean
    public Supplier<Message<String>> randomIntegerPublisher() {
        return () -> MessageBuilder
                .withPayload(String.valueOf(randomNumberGenerator.nextInt()))
                .setHeader(KafkaHeaders.RECEIVED_KEY, 0)
                .build();
    }

    @Bean
    public Consumer<KStream<String, String>> errorStream() {
        return input -> input
                // Remove odd numbers so we throw an exception on every other message
                .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
                .filter((key, value) -> (value & 1) == 0)
                .map((key, value) -> {
                    throw new RuntimeException("Pushing uncaught error to kill stream!");
                }
        );
    }

    @Bean
    public Consumer<KStream<String, String>> errorHandledStream() {
        return input -> input
                // Remove odd numbers so we throw an exception on ever other message
                .map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
                .filter((key, value) -> (value & 1) == 0)
                .map((key, value) -> {
                            System.out.println("This should not kill the stream");
                            throw new RuntimeException("Publishing error to be caught!");
                        }
                );
    }

    @Bean
    // TODO - doesn't seem to be working, is this because we're using kstreams?
    public Consumer<ErrorMessage> defaultErrorHandler() {
        return v -> {
            System.out.println("Caught and handling error");
            System.out.println(v.toString());
        };
    }

    @Bean
    // TODO - not working via the config
    /**
     * bindings:
     *   errorHandledStream-in-0:
     *     consumer:
     *       commonErrorHandlerBeanName: defaultCommonErrorHandler
     */
    public CommonErrorHandler defaultCommonErrorHandler() {
        return new CommonLoggingErrorHandler();
    }

    /**
     * Also not working
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setCommonErrorHandler(defaultCommonErrorHandler());
        return factory;
    }
}

and

spring:
  cloud:
    function:
      definition: randomIntegerPublisher;errorStream;errorHandledStream;defaultErrorHandler
    stream:
      default:
        error-handler-definition: defaultErrorHandler
      kafka:
        streams:
          binder:
            deserialization-exception-handler: logandcontinue
          bindings:
            errorHandledStream-in-0:
              error-handler-definition: defaultErrorHandler
              consumer:
                commonErrorHandlerBeanName: defaultCommonErrorHandler
        bindings:
          errorHandledStream-in-0:
            consumer:
              commonErrorHandlerBeanName: defaultCommonErrorHandler
      bindings:
        randomIntegerPublisher-out-0:
          destination: integer-topic
        errorStream-in-0:
          destination: integer-topic
        errorHandledStream-in-0:
          destination: integer-topic
          error-handler-definition: defaultErrorHandler

Pretty much every documented variation of error handling does not seem to function correctly.

My first stream, errorStream acts as expected. Killing the relevant consumer (although the global configs should catch this).

The second stream, errorHandledStream attempts to have config provided that catches the error.

The primary ask, is when exceptions occur within the map method (for this example), to be able to have some exception handler perform an action so that the stream does not crash and restart.

This is all with the latest spring-cloud-streams versions, and the following dependencies.

extra["springCloudVersion"] = "2022.0.3"

    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")

The following references were used:

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_error_handling

What am I missing here, and/or what reference can I use to review and implementation. OR, is there a working example posted anywhere (or can be provided here) to use as a starting point?


Solution

  • The capability requested here is being added as part of https://github.com/spring-cloud/spring-cloud-stream/issues/2779.

    This is accomplished by using https://github.com/spring-cloud/spring-cloud-stream/blob/main/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/DltAwareProcessor.java