springspring-webfluxreactive-streams

Spring 6 sending PartEvent from WebClient in IntegrationTest throws ServerWebInputException


Spring 6 introduce PartEvent to handle multiparts, my example is based on Spring 6 RC2/Java 17/Maven, check the example codes.

The server side is like this.

@PostMapping("partevents")
    public ResponseEntity<Flux<Object>> handlePartsEvents(@RequestBody Flux<PartEvent> allPartsEvents) {
        var result = allPartsEvents
                .windowUntil(PartEvent::isLast)
                .concatMap(p ->
                        p.switchOnFirst(
                                (signal, partEvents) -> {
                                    if (signal.hasValue()) {
                                        PartEvent event = signal.get();
                                        if (event instanceof FormPartEvent formEvent) {
                                            String value = formEvent.value();
                                            // handle form field
                                            log.debug("form value: {}", value);
                                            return Mono.just(value);

                                        } else if (event instanceof FilePartEvent fileEvent) {
                                            String filename = fileEvent.filename();
                                            log.debug("upload file name:{}", filename);
                                            Flux<DataBuffer> contents = partEvents.map(PartEvent::content);

                                            // handle file upload
                                            var fileBytes = DataBufferUtils.join(contents)
                                                    .map(dataBuffer -> {
                                                        byte[] bytes = new byte[dataBuffer.readableByteCount()];
                                                        dataBuffer.read(bytes);
                                                        DataBufferUtils.release(dataBuffer);
                                                        return bytes;
                                                    });

                                            return Mono.just(filename);
                                        } else {
                                            return Mono.error(new RuntimeException("Unexpected event: " + event));
                                        }
                                    }
                                    return partEvents; // either complete or error signal
                                }
                        )
                );

        return ok().body(result);
    }

In the integration test, I used a WebClient to send PartEvent to the server like this.

    @Test
    public void testPartEvents() throws Exception {
        this.client
                .post().uri("/partevents")
                .contentType(MULTIPART_FORM_DATA)
                .body(
                        Flux.concat(
                                FormPartEvent.create("name", "test"),
                                FilePartEvent.create("file", new ClassPathResource("spring.png"))
                        ),
                        PartEvent.class
                )
                .exchangeToFlux(clientResponse -> {
                            assertThat(clientResponse.statusCode()).isEqualTo(HttpStatus.OK);
                            return clientResponse.bodyToFlux(String.class);
                        }
                )
                .as(StepVerifier::create)
                .expectNextCount(2)
                .verifyComplete();
    }

When running the tests, and I got the following errors.

2022-10-23 21:56:28,530 ERROR [reactor-http-nio-2] org.springframework.web.server.adapter.HttpWebHandlerAdapter: [a48ba684-1] Error [org.springframework.web.server.ServerWebInputException: 400 BAD_REQUEST "Failed to read HTTP message"] for HTTP POST "/partevents", but ServerHttpResponse already committed (200 OK)
2022-10-23 21:56:28,532 ERROR [reactor-http-nio-2] reactor.util.Loggers$Slf4JLogger: [a48ba684-1, L:/127.0.0.1:8080 - R:/127.0.0.1:57289] Error finishing response. Closing connection
org.springframework.web.server.ServerWebInputException: 400 BAD_REQUEST "Failed to read HTTP message"
    at org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.handleReadError(AbstractMessageReaderArgumentResolver.java:224)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ Handler com.example.demo.web.FileUploadController#handlePartsEvents(Flux) [DispatcherHandler]
    *__checkpoint ⇢ HTTP POST "/partevents" [ExceptionHandlingWebHandler]
Original Stack Trace:
        at org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.handleReadError(AbstractMessageReaderArgumentResolver.java:224)
        at org.springframework.web.reactive.result.method.annotation.AbstractMessageReaderArgumentResolver.lambda$readBody$0(AbstractMessageReaderArgumentResolver.java:173)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.maybeOnError(FluxConcatMapNoPrefetch.java:326)
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerError(FluxConcatMapNoPrefetch.java:297)
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onError(FluxConcatMap.java:875)
        at reactor.core.publisher.FluxSwitchOnFirst$SwitchOnFirstControlSubscriber.onError(FluxSwitchOnFirst.java:955)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2208)
        at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
        at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
        at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157)
        at reactor.core.publisher.FluxSwitchOnFirst$AbstractSwitchOnFirstMain.onError(FluxSwitchOnFirst.java:572)
        at reactor.core.publisher.FluxWindowPredicate$WindowFlux.checkTerminated(FluxWindowPredicate.java:765)
        at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:662)
        at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:748)
        at reactor.core.publisher.FluxWindowPredicate$WindowFlux.onError(FluxWindowPredicate.java:806)
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.signalAsyncError(FluxWindowPredicate.java:352)
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.checkTerminated(FluxWindowPredicate.java:536)
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drainLoop(FluxWindowPredicate.java:502)
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drain(FluxWindowPredicate.java:432)
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onError(FluxWindowPredicate.java:291)
        at reactor.core.publisher.FluxCreate$BaseSink.error(FluxCreate.java:474)
        at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:802)
        at reactor.core.publisher.FluxCreate$BufferAsyncSink.error(FluxCreate.java:747)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:237)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:213)
        at reactor.core.publisher.FluxCreate$SerializedFluxSink.error(FluxCreate.java:189)
        at org.springframework.http.codec.multipart.MultipartParser.emitError(MultipartParser.java:180)
        at org.springframework.http.codec.multipart.MultipartParser$BodyState.onComplete(MultipartParser.java:609)
        at org.springframework.http.codec.multipart.MultipartParser.hookOnComplete(MultipartParser.java:125)
        at reactor.core.publisher.BaseSubscriber.onComplete(BaseSubscriber.java:197)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
        at reactor.netty5.channel.FluxReceive.terminateReceiver(FluxReceive.java:446)
        at reactor.netty5.channel.FluxReceive.drainReceiver(FluxReceive.java:258)
        at reactor.netty5.channel.FluxReceive.onInboundComplete(FluxReceive.java:382)
        at reactor.netty5.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:423)
        at reactor.netty5.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:577)
        at reactor.netty5.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:110)
        at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
        at io.netty5.channel.DefaultChannelHandlerContext.findAndInvokeChannelRead(DefaultChannelHandlerContext.java:445)
        at io.netty5.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:426)
        at reactor.netty5.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:251)
        at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
        at io.netty5.channel.DefaultChannelHandlerContext.findAndInvokeChannelRead(DefaultChannelHandlerContext.java:445)
        at io.netty5.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:426)
        at io.netty5.channel.internal.DelegatingChannelHandlerContext.fireChannelRead(DelegatingChannelHandlerContext.java:113)
        at io.netty5.channel.internal.DelegatingChannelHandlerContext.fireChannelRead(DelegatingChannelHandlerContext.java:113)
        at io.netty5.handler.codec.ByteToMessageDecoder$ByteToMessageDecoderContext.fireChannelRead(ByteToMessageDecoder.java:446)
        at io.netty5.channel.internal.DelegatingChannelHandlerContext.fireChannelRead(DelegatingChannelHandlerContext.java:113)
        at io.netty5.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder$1.fireChannelRead(HttpServerCodec.java:134)
        at io.netty5.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:387)
        at io.netty5.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder.decode(HttpServerCodec.java:116)
        at io.netty5.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:387)
        at io.netty5.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:330)
        at io.netty5.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:204)
        at io.netty5.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:230)
        at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
        at io.netty5.channel.DefaultChannelHandlerContext.findAndInvokeChannelRead(DefaultChannelHandlerContext.java:445)
        at io.netty5.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:426)
        at io.netty5.channel.ChannelHandler.channelRead(ChannelHandler.java:235)
        at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
        at io.netty5.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:838)
        at io.netty5.channel.AbstractChannel$ReadSink.processRead(AbstractChannel.java:1975)
        at io.netty5.channel.nio.AbstractNioByteChannel.doReadNow(AbstractNioByteChannel.java:74)
        at io.netty5.channel.AbstractChannel$ReadSink.readLoop(AbstractChannel.java:2035)
        at io.netty5.channel.AbstractChannel.readNow(AbstractChannel.java:910)
        at io.netty5.channel.nio.AbstractNioChannel.access$100(AbstractNioChannel.java:42)
        at io.netty5.channel.nio.AbstractNioChannel$1.handle(AbstractNioChannel.java:108)
        at io.netty5.channel.nio.NioHandler.processSelectedKey(NioHandler.java:506)
        at io.netty5.channel.nio.NioHandler.processSelectedKeysOptimized(NioHandler.java:489)
        at io.netty5.channel.nio.NioHandler.processSelectedKeys(NioHandler.java:430)
        at io.netty5.channel.nio.NioHandler.run(NioHandler.java:407)
        at io.netty5.channel.SingleThreadEventLoop.runIO(SingleThreadEventLoop.java:192)
        at io.netty5.channel.SingleThreadEventLoop.run(SingleThreadEventLoop.java:176)
        at io.netty5.util.concurrent.SingleThreadEventExecutor.lambda$doStartThread$4(SingleThreadEventExecutor.java:774)
        at io.netty5.util.internal.ThreadExecutorMap.lambda$apply$1(ThreadExecutorMap.java:68)
        at io.netty5.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.core.codec.DecodingException: Could not find end of body (␍␊--IPfFMKLDj72s9d-CAWfR9E36mWg2R8ubNDR2S-h)
    at org.springframework.http.codec.multipart.MultipartParser$BodyState.onComplete(MultipartParser.java:609)
    at org.springframework.http.codec.multipart.MultipartParser.hookOnComplete(MultipartParser.java:125)
    at reactor.core.publisher.BaseSubscriber.onComplete(BaseSubscriber.java:197)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at reactor.netty5.channel.FluxReceive.terminateReceiver(FluxReceive.java:446)
    at reactor.netty5.channel.FluxReceive.drainReceiver(FluxReceive.java:258)
    at reactor.netty5.channel.FluxReceive.onInboundComplete(FluxReceive.java:382)
    at reactor.netty5.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:423)
    at reactor.netty5.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:577)
    at reactor.netty5.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:110)
    at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
    at io.netty5.channel.DefaultChannelHandlerContext.findAndInvokeChannelRead(DefaultChannelHandlerContext.java:445)
    at io.netty5.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:426)
    at reactor.netty5.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:251)
    at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
    at io.netty5.channel.DefaultChannelHandlerContext.findAndInvokeChannelRead(DefaultChannelHandlerContext.java:445)
    at io.netty5.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:426)
    at io.netty5.channel.internal.DelegatingChannelHandlerContext.fireChannelRead(DelegatingChannelHandlerContext.java:113)
    at io.netty5.channel.internal.DelegatingChannelHandlerContext.fireChannelRead(DelegatingChannelHandlerContext.java:113)
    at io.netty5.handler.codec.ByteToMessageDecoder$ByteToMessageDecoderContext.fireChannelRead(ByteToMessageDecoder.java:446)
    at io.netty5.channel.internal.DelegatingChannelHandlerContext.fireChannelRead(DelegatingChannelHandlerContext.java:113)
    at io.netty5.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder$1.fireChannelRead(HttpServerCodec.java:134)
    at io.netty5.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:387)
    at io.netty5.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder.decode(HttpServerCodec.java:116)
    at io.netty5.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:387)
    at io.netty5.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:330)
    at io.netty5.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:204)
    at io.netty5.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:230)
    at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
    at io.netty5.channel.DefaultChannelHandlerContext.findAndInvokeChannelRead(DefaultChannelHandlerContext.java:445)
    at io.netty5.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:426)
    at io.netty5.channel.ChannelHandler.channelRead(ChannelHandler.java:235)
    at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
    at io.netty5.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:838)
    at io.netty5.channel.AbstractChannel$ReadSink.processRead(AbstractChannel.java:1975)
    at io.netty5.channel.nio.AbstractNioByteChannel.doReadNow(AbstractNioByteChannel.java:74)
    at io.netty5.channel.AbstractChannel$ReadSink.readLoop(AbstractChannel.java:2035)
    at io.netty5.channel.AbstractChannel.readNow(AbstractChannel.java:910)
    at io.netty5.channel.nio.AbstractNioChannel.access$100(AbstractNioChannel.java:42)
    at io.netty5.channel.nio.AbstractNioChannel$1.handle(AbstractNioChannel.java:108)
    at io.netty5.channel.nio.NioHandler.processSelectedKey(NioHandler.java:506)
    at io.netty5.channel.nio.NioHandler.processSelectedKeysOptimized(NioHandler.java:489)
    at io.netty5.channel.nio.NioHandler.processSelectedKeys(NioHandler.java:430)
    at io.netty5.channel.nio.NioHandler.run(NioHandler.java:407)
    at io.netty5.channel.SingleThreadEventLoop.runIO(SingleThreadEventLoop.java:192)
    at io.netty5.channel.SingleThreadEventLoop.run(SingleThreadEventLoop.java:176)
    at io.netty5.util.concurrent.SingleThreadEventExecutor.lambda$doStartThread$4(SingleThreadEventExecutor.java:774)
    at io.netty5.util.internal.ThreadExecutorMap.lambda$apply$1(ThreadExecutorMap.java:68)
    at io.netty5.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
2022-10-23 21:56:28,579 ERROR [reactor-http-nio-2] reactor.util.Loggers$Slf4JLogger: Operator called default onErrorDropped
org.springframework.core.codec.DecodingException: Could not find end of body (␍␊--IPfFMKLDj72s9d-CAWfR9E36mWg2R8ubNDR2S-h)
    at org.springframework.http.codec.multipart.MultipartParser$BodyState.onComplete(MultipartParser.java:609)
    at org.springframework.http.codec.multipart.MultipartParser.hookOnComplete(MultipartParser.java:125)
    at reactor.core.publisher.BaseSubscriber.onComplete(BaseSubscriber.java:197)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at reactor.netty5.channel.FluxReceive.terminateReceiver(FluxReceive.java:446)
    at reactor.netty5.channel.FluxReceive.drainReceiver(FluxReceive.java:258)
    at reactor.netty5.channel.FluxReceive.onInboundComplete(FluxReceive.java:382)
    at reactor.netty5.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:423)
    at reactor.netty5.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:577)
    at reactor.netty5.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:110)
    at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
    at io.netty5.channel.DefaultChannelHandlerContext.findAndInvokeChannelRead(DefaultChannelHandlerContext.java:445)
    at io.netty5.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:426)
    at reactor.netty5.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:251)
    at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
    at io.netty5.channel.DefaultChannelHandlerContext.findAndInvokeChannelRead(DefaultChannelHandlerContext.java:445)
    at io.netty5.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:426)
    at io.netty5.channel.internal.DelegatingChannelHandlerContext.fireChannelRead(DelegatingChannelHandlerContext.java:113)
    at io.netty5.channel.internal.DelegatingChannelHandlerContext.fireChannelRead(DelegatingChannelHandlerContext.java:113)
    at io.netty5.handler.codec.ByteToMessageDecoder$ByteToMessageDecoderContext.fireChannelRead(ByteToMessageDecoder.java:446)
    at io.netty5.channel.internal.DelegatingChannelHandlerContext.fireChannelRead(DelegatingChannelHandlerContext.java:113)
    at io.netty5.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder$1.fireChannelRead(HttpServerCodec.java:134)
    at io.netty5.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:387)
    at io.netty5.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder.decode(HttpServerCodec.java:116)
    at io.netty5.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:387)
    at io.netty5.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:330)
    at io.netty5.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:204)
    at io.netty5.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:230)
    at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
    at io.netty5.channel.DefaultChannelHandlerContext.findAndInvokeChannelRead(DefaultChannelHandlerContext.java:445)
    at io.netty5.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:426)
    at io.netty5.channel.ChannelHandler.channelRead(ChannelHandler.java:235)
    at io.netty5.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:455)
    at io.netty5.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:838)
    at io.netty5.channel.AbstractChannel$ReadSink.processRead(AbstractChannel.java:1975)
    at io.netty5.channel.nio.AbstractNioByteChannel.doReadNow(AbstractNioByteChannel.java:74)
    at io.netty5.channel.AbstractChannel$ReadSink.readLoop(AbstractChannel.java:2035)
    at io.netty5.channel.AbstractChannel.readNow(AbstractChannel.java:910)
    at io.netty5.channel.nio.AbstractNioChannel.access$100(AbstractNioChannel.java:42)
    at io.netty5.channel.nio.AbstractNioChannel$1.handle(AbstractNioChannel.java:108)
    at io.netty5.channel.nio.NioHandler.processSelectedKey(NioHandler.java:506)
    at io.netty5.channel.nio.NioHandler.processSelectedKeysOptimized(NioHandler.java:489)
    at io.netty5.channel.nio.NioHandler.processSelectedKeys(NioHandler.java:430)
    at io.netty5.channel.nio.NioHandler.run(NioHandler.java:407)
    at io.netty5.channel.SingleThreadEventLoop.runIO(SingleThreadEventLoop.java:192)
    at io.netty5.channel.SingleThreadEventLoop.run(SingleThreadEventLoop.java:176)
    at io.netty5.util.concurrent.SingleThreadEventExecutor.lambda$doStartThread$4(SingleThreadEventExecutor.java:774)
    at io.netty5.util.internal.ThreadExecutorMap.lambda$apply$1(ThreadExecutorMap.java:68)
    at io.netty5.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

But the unit test using WebTestClient is passed.


Solution

  • Get the support from Spring developers, there are several problems caused the issue.

    1. There is a bug in the new JdkClientConnector, fixed in Spring 6.0.0-RC3.
    2. We have to consume the FilePartEvent, event in this demo, we have no further step to process it.
    3. In the formEvent and fileEvent handling, we have to append an extra \n to text, thus Mono.just() can be concat into a Flux correctly.