spring-bootspring-kafkaspring-cloud-streamkafka-producer-apispring-cloud-stream-binder-kafka

Spring Cloud Stream 4.0.4 | Kafka Producer | Functional Style


I have come across below example of writing Kafka Producer with Spring Cloud Stream in functional style.

Dependency

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
@Bean
public Supplier<MyPojo> notificationProducer() {
    return () -> new Notification();

}

And binding it with topic in application.config file like below

spring:
  cloud:
    function:
      definition: notificationProducer
    stream:
      bindings:
        notificationProducer-out-0:
          destination: notification-topic

      kafka:
        binder:
          brokers: localhost:54442

Reference: https://www.ideas2it.com/blogs/spring-cloud-streams-using-function-based-model/

Here, the notificationProducer function is messaging system agnostic. It can be bind to any messaging queue (RabbitMQ, ActiveMQ, Kinesis etc) by just changing the binder dependency and configuration. This serves my purpose.

Only thing with this example is the notificationProducer function will keep on producing on regular interval. I don't want that. I want to produce an event when I want.

Something like below example

        Map<String, Object> properties = new HashMap<>();
        properties.put("bootstrap.servers", "localhost:54442");
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", CustomSerializer.class);

        KafkaProducer<String, Notification> notificationProducer = new KafkaProducer<>(properties);

        Notification notificationEvent = new Notification();
        String notificationTopic = "notification-topic";

        notificationProducer.send(new ProducerRecord<>(notificationTopic, notificationEvent));

In above example, we are using KafkaProducer, which will make function Kafka specific, which I don't want.

So long story short, using Spring Cloud Stream 4.0.4 (latest), I want to write a Producer function in functional style, which should be messaging system agnostic. And using which I should be able to produce event whenever I want, not in regular interval.

I have googled it and went through many docs but cannot find any good example on how to do so.


Solution

  • Indeed if you are using Supplier it will produce message at poller configured intervals (default is 1 sec). If you want to have full control of message production, please use StreamBridge. Please read the note here about it and this section provides more details on using StreamBridge