I have come across below example of writing Kafka Producer with Spring Cloud Stream in functional style.
Dependency
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
@Bean
public Supplier<MyPojo> notificationProducer() {
return () -> new Notification();
}
And binding it with topic in application.config
file like below
spring:
cloud:
function:
definition: notificationProducer
stream:
bindings:
notificationProducer-out-0:
destination: notification-topic
kafka:
binder:
brokers: localhost:54442
Reference: https://www.ideas2it.com/blogs/spring-cloud-streams-using-function-based-model/
Here, the notificationProducer
function is messaging system agnostic. It can be bind to any messaging queue (RabbitMQ, ActiveMQ, Kinesis etc) by just changing the binder dependency and configuration. This serves my purpose.
Only thing with this example is the notificationProducer
function will keep on producing on regular interval. I don't want that. I want to produce an event when I want.
Something like below example
Map<String, Object> properties = new HashMap<>();
properties.put("bootstrap.servers", "localhost:54442");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", CustomSerializer.class);
KafkaProducer<String, Notification> notificationProducer = new KafkaProducer<>(properties);
Notification notificationEvent = new Notification();
String notificationTopic = "notification-topic";
notificationProducer.send(new ProducerRecord<>(notificationTopic, notificationEvent));
In above example, we are using KafkaProducer
, which will make function Kafka specific, which I don't want.
So long story short, using Spring Cloud Stream 4.0.4 (latest), I want to write a Producer function in functional style, which should be messaging system agnostic. And using which I should be able to produce event whenever I want, not in regular interval.
I have googled it and went through many docs but cannot find any good example on how to do so.
Indeed if you are using Supplier it will produce message at poller configured intervals (default is 1 sec). If you want to have full control of message production, please use StreamBridge. Please read the note here about it and this section provides more details on using StreamBridge