springspring-bootkotlinspring-integration

Spring Integration WebFlux - Kotlin DSL: Consuming SSE


Currently I'm in the process of porting an previously working application which consumes ServerSentEvents (SSE) using Spring Integration WebFlux from Java to Kotlin.

The former worked in Java using Spring Boot 3.1.4

@Bean
public IntegrationFlow handleEvents(@Qualifier("eventStreamWebClient") WebClient webClient) {
    return integrationFlow -> integrationFlow
        .channel(requestChannel())
        .handle(
            WebFlux.outboundGateway("/eventstream/clip/v2", webClient)
                .httpMethod(HttpMethod.GET)
                .replyPayloadToFlux(true)
                .expectedResponseType(new ParameterizedTypeReference<ServerSentEvent<List<PhilipsHueEvent>>>() {
                }))
        .split()
        .split(ServerSentEvent.class, ServerSentEvent::data)
        .split(PhilipsHueEvent.class, PhilipsHueEvent::eventData)
        .<Object, Class<?>>route(Object::getClass,
            router -> router.channelKeyFallback(false)
                .channelMapping(PhilipsHueLightEvent.class, lightEventChannel())
                .channelMapping(PhilipsHueTemperatureEvent.class, temperatureEventChannel())
                .defaultOutputToParentFlow())
        .log(LoggingHandler.Level.INFO, Message::getPayload)
        .nullChannel();
}

However writing the same in Kotlin DSL, using Spring Boot 4.0.0-M2 doesn't work;

@Bean
fun handleEventStream(eventStreamWebClient: WebClient): IntegrationFlow =
    integrationFlow {
        channel(requestChannel())
        handle(
            WebFlux.outboundGateway("/eventstream/clip/v2", eventStreamWebClient)
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(object : ParameterizedTypeReference<ServerSentEvent<List<HueEvent>>>() {})
                .replyPayloadToFlux(true)
        )
        split()
        split<ServerSentEvent<List<HueEvent>>> { it.data()!! }
        split<HueEvent> { it.eventData }
        route<Object, Class<*>>(Object::getClass) {
            channelKeyFallback(false)
            channelMapping(HueLightEvent::class.java, lightEventChannel())
            channelMapping(HueTemperatureEvent::class.java, temperatureEventChannel())
            defaultOutputToParentFlow()
        }
        log(LoggingHandler.Level.INFO) { it.payload }
        channel("nullChannel")
    }

It returns the following exception:

Caused by: java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-3
    at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:168)
    at org.springframework.integration.splitter.AbstractMessageSplitter.prepareIteratorResult(AbstractMessageSplitter.java:210)
    at org.springframework.integration.splitter.AbstractMessageSplitter.handleRequestMessage(AbstractMessageSplitter.java:139)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:146)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)

Setting a breakpoint in the AbstractMessageSplitter on this line boolean reactive = getOutputChannel() instanceof ReactiveStreamsSubscribableChannel; shows that the actual output channel is of type DirectChannel, therefor the splitter is attempting to split an Iterator<?> instead of a Flux<?>.

Is there some piece I'm missing in the migration of Spring Integaration (Boot version: 3.1.4 -> 4.0.0-M2), or is the same simply not possible using Kotlin DSL?


Solution

  • The problem is here:

        split()
        split<ServerSentEvent<List<HueEvent>>> { it.data()!! }
    

    Since you don't specify any channel in between a default DirectChannel is used:

        at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:168)
        at org.springframework.integration.splitter.AbstractMessageSplitter.prepareIteratorResult(AbstractMessageSplitter.java:210)
    

    Same behavior is with Java configuration anyway. Perhaps that old Spring Boot version is forgiving at that spot.

    Consider to do like this:

    channel(FluxMessageChannel())
    

    in between those split operators.