javaspring-cloudspring-cloud-streamspring-cloud-stream-binder-kafkaspring-messaging

Producer callback in Spring Cloud Stream using StreamBridge


I'm wondering a way to perform a callback using StreamBridge, I want to do something similar to KafkaTemplate.send that returns a ListenableFuture.

Is it possible with spring cloud stream to publish some events using kafka binder and use a callback like onSuccess and onFailure?

example: producer.send(record, new callback { ... })


Solution

  • You can either set sync on the producer binding and the send will wait internally on the future completion, or you can configure a recordMetadataChannel to get the results of the send asynchronously.

    https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties

    recordMetadataChannel

    The bean name of a MessageChannel to which successful send results should be sent; the bean must exist in the application context. The message sent to the channel is the sent message (after conversion, if any) with an additional header KafkaHeaders.RECORD_METADATA. The header contains a RecordMetadata object provided by the Kafka client; it includes the partition and offset where the record was written in the topic.

    ResultMetadata meta = 
        sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
    

    Failed sends go the producer error channel (if configured); see Error Channels.

    https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-error-channels

    EDIT

    Here's an example:

    spring.cloud.stream.bindings.output-out-0.destination=dest1
    spring.cloud.stream.bindings.output-out-0.producer.error-channel-enabled=true
    spring.cloud.stream.kafka.bindings.output-out-0.producer.record-metadata-channel=meta
    spring.cloud.stream.kafka.bindings.output-out-0.producer.configuration.[max.block.ms]=5000
    spring.cloud.stream.kafka.bindings.output-out-0.producer.configuration.[request.timeout.ms]=5000
    spring.cloud.stream.kafka.bindings.output-out-0.producer.configuration.[retries]=0
    
    @SpringBootApplication
    public class So72900966Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So72900966Application.class, args);
        }
    
        @Bean
        ApplicationRunner runner(StreamBridge bridge) {
            return args -> {
                bridge.send("output-out-0", "foo");
                System.out.println("Delete topic dest1 from broker; then hit Enter");
                System.in.read();
                bridge.send("output-out-0", "foo");
                Thread.sleep(2_000);
            };
        }
    
    }
    
    @Component
    class ResultHandler {
    
        @ServiceActivator(inputChannel = "meta")
        void meta(Message<?> result) {
            System.out.println(result.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class));
        }
    
        @ServiceActivator(inputChannel = "errorChannel")
        void errors(Message<?> error) {
            System.out.println(error);
        }
    
    }
    

    After the first result is received:

    kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic dest1
    

    Then hit enter.

    Result:

    Delete topic dest1 from broker; then hit Enter
    dest1-0@0
    ...
    ErrorMessage [payload=org.springframework.integration.kafka.support.KafkaSendFailureException: ...
    2022-07-07 13:36:19.185 ERROR 11735 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='byte[3]' to topic dest1:
    
    org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.