springkotlinfluxreactor

Spring Reactor - Emit last item


I have an endpoint where I call a Spring spring service to get status of an order. The service returns and Enum with the values: Processing, Completed or Error.

@GetMapping(value = ["/{orderId}/state"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun orderState(@PathVariable("orderId") orderId: Long): Flux<OrderStateDetails> {
    return Flux.interval(Duration.ofSeconds(1))
        .take(30)
        .map { orderService.orderState(orderId) }
        .takeWhile { orderStateDetails -> orderStateDetails.state == OrderState.Procession }
}

I'm using a Flux to implement a low tech polling until state is no longer Processing and no longer than 30 seconds. I wan't the endpoint to return:

But the takeWhile is not emitting the last item Completed. I tried to add .also { orderService.orderState(orderId) } and .concatWith(Flux.just(orderService.orderState(orderId))) but that will make an extra unnecessary call and also does not emit the item. How can I make it emit the last items as well?


Solution

  • Instead of using takeWhile() which does not give you the !result of your predicate, you should use takeUntil()

    Here's a simple example in Java, but you can easily adapt it to Kotlin and your code:

    Flux.just("Processing", "Processing", "Processing", "Completed")
                    .delayElements(Duration.ofMillis(1000))
                    .takeUntil(status -> status.equals("Completed"))
                    .subscribe(System.out::println);
    

    This will print the following output:

    Processing
    Processing
    Processing
    Completed
    

    So I think this is what you want to achieve.