The scenario is I'm developing a Kotlin microservice that uses coroutine and one of the libraries I'm using (reactor.Kafka) is expecting a flux publisher.
How can I bridge the two? (I'm using kotlinx-coroutines-reactive
and kotlinx-coroutines-reactor
to bridge the other way around when consuming an event I open mono{}
block and call a suspending function but in this case its the other way around).
The function I'm trying to call:
kafkaSender.send(Flux.just(SenderRecord.create(record, "0")))
At first glance, we do not see bridging methods in kotlinx-coroutines-reactor to await
from a Flux
. But:
Note that
Mono
andFlux
are subclasses of Reactive Streams'Publisher
and extensions for it are covered by the kotlinx-coroutines-reactive module.
I think you can simply call .awaitLast()
.