I am using Spring 5, in detail the Reactor project, to read information from a huge Mongo collection to a Kafka topic. Unfortunately, the production of Kafka messages is much faster than the program that consumes them. So, I need to implement some backpressure mechanism.
Suppose I want a throughput of 100 messages every second. Googling a little, I decided to combine the feature of the buffer(int maxSize)
method, zipping the result with a Flux
that emits a message using a predefined interval.
// Create a clock that emits an event every second
final Flux<Long> clock = Flux.interval(Duration.ofMillis(1000L));
// Create a buffered producer
final Flux<ProducerRecord<String, Data>> outbound =
repository.findAll()
.map(this::buildData)
.map(this::createKafkaMessage)
.buffer(100)
// Limiting the emission in time interval
.zipWith(clock, (msgs, tick) -> msgs)
.flatMap(Flux::fromIterable);
// Subscribe a Kafka sender
kafkaSender.createOutbound()
.send(outbound)
.then()
.block();
Is there a smarter way to do this? I mean, it seems to me a little bit complex (the zip part, overall).
Yes, you can use delayElements(Duration.ofSeconds(1))
operation directily whitout need to zipWith it. There is always enhancement with reactor cool project as it a continious upgrades so let us be sticky :) hope was helpful!