spring-bootkotlinreactive-programmingspring-kafkareactor-kafka

Springboot webflux with kafka


I'm having some hard times trying to understand a codebase written in Kotlin using spring boot webflux and reading messages from a kafka topic using spring-kafka.

The signature of the controller in the following:

import org.springframework.kafka.annotation.KafkaListener
import reactor.core.Disposable


interface CreationEventConsumer {
    @KafkaListener(
        topics = ["\${topic.east}", "\${topic.west}"],
        containerFactory = "creationFactory"
    )
    fun readEvent(record: String): Disposable
}

As you can see in the signature, the method readEvent is returning a Disposable.

So I don't understand why the function has to return something considering that is just a one way flow, does spring do something with that? Is there any benefits?

The implementation of the method is the following:

    override fun readEvent(record: String): Disposable {
        logger.info("$INCOMING")
        return creationEventService.process(record)
            .doOnError {
                logger.error("error")
            }.retry(3)
            .subscribe()
    }

The creationEventService.process(record) call is returning a Flux

I also have more questions because there is a special API that adds support for reactive programming for kafka but this projects is using spring-kafka.

Thanks!


Solution

  • Makes no sense to me either; Spring knows nothing about a Disposable.

    Returning a value (of any kind) from a @KafkaListener will do nothing without a @SendTo annotation - in that case, the "reply" is sent to the replyTo header. If there is no @SendTo, the returned object is ignored (with a DEBUG log).

    In this case, the record's offset will be committed, regardless of the success/failure of the async operation.

    You are correct; reactor-kafka should be used for reactive workloads, not Spring for Apache Kafka.