I have an endpoint where I call a Spring spring service to get status of an order. The service returns and Enum with the values: Processing
, Completed
or Error
.
@GetMapping(value = ["/{orderId}/state"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun orderState(@PathVariable("orderId") orderId: Long): Flux<OrderStateDetails> {
return Flux.interval(Duration.ofSeconds(1))
.take(30)
.map { orderService.orderState(orderId) }
.takeWhile { orderStateDetails -> orderStateDetails.state == OrderState.Procession }
}
I'm using a Flux to implement a low tech polling until state is no longer Processing
and no longer than 30 seconds. I wan't the endpoint to return:
But the takeWhile
is not emitting the last item Completed
. I tried to add .also { orderService.orderState(orderId) }
and .concatWith(Flux.just(orderService.orderState(orderId)))
but that will make an extra unnecessary call and also does not emit the item.
How can I make it emit the last items as well?
Instead of using takeWhile()
which does not give you the !result of your predicate, you should use takeUntil()
Here's a simple example in Java, but you can easily adapt it to Kotlin and your code:
Flux.just("Processing", "Processing", "Processing", "Completed")
.delayElements(Duration.ofMillis(1000))
.takeUntil(status -> status.equals("Completed"))
.subscribe(System.out::println);
This will print the following output:
Processing
Processing
Processing
Completed
So I think this is what you want to achieve.