I'm using project reactor and I'd like to perform the following:
@Override
public void run(ApplicationArguments args) {
Flux.from(KafkaReceiver.create(receiverOptions)
.receive()
.map(this::getObject)
.flatMap(this::iterateElasticWrites)
.flatMap(this::writeTheWholeObjectToS3)
).subscribe();
}
// What I'd like to do - but a non reactive code
private Publisher<MyObj> iterateElasticWrites(MyObj message) {
for (MyDoc file: message.getDocs()) {
writeElasticDoc(file.getText());
}
return Mono.just(message);
}
I'm struggling in finding out the equivalent of the iterateElasticWrites
in Project Reactor. I'd like to perform an iteration of an object of mine (MyObj
), and write each of its documents list's element into elasticsearch reactively.
In Reactor you always need to construct a reactive flow using different operators and all reactive/async code should return Mono
or Flux
.
Looking at your example it could look like
private Mono<MyObj> iterateElasticWrites(MyObj message) {
return Flux.fromIterable(message.getDocs())
.flatMap(doc -> writeElasticDoc(doc.getText()))
.thenReturn(message);
}
where writeElasticDoc
could be defined as
private Mono<Void> writeElasticDoc(String text) {
...
}