javaspring-bootspring-webfluxproject-reactorreactor-netty

Reactor Netty: Connection prematurely closed BEFORE response with WebClient


When trying to process data retrieved from the WebClient at a random point in time I get the "Connection prematurely closed BEFORE response" error. The pages contain quite a lot of data that is processed afterwards (mostly DB updates and inserts). After a random amount of time, I get the error:

**Reactor Netty: Connection prematurely closed BEFORE response with WebClient.**

The method in question is the following:

  private Mono<String> migrateSomeData() {
    return getAllPages()
        .flatMapIterable(Page::getItems)
        .filter(this::isValidItem)
        .doOnNext(item -> doSomeLogging())
        .filter(item-> this.checkIfAlreadyProcessed(item))
        .flatMap(this::mapToDto)
        .flatMap(this::persist)
        .doOnNext(this::saveAsProcessed)
        .collectList()
        .map(this::getTotalAmountOfProcessed);
  }

The code has had certain parts omitted for brewity. getAllPages utilizes the Flux .expand method to recursively call the next pages. This is verified and works correctly. One more important thing to note is that all database operations are blocking as we have not implemented reactive jdbc.

I've tried updating the webclient configuration:

@Bean
  public WebClient someClient(
      ReactiveOAuth2AuthorizedClientManager clientAuthorizedClientManager) {

    ServerOAuth2AuthorizedClientExchangeFilterFunction oauth =
        new ServerOAuth2AuthorizedClientExchangeFilterFunction(clientAuthorizedClientManager);
    oauth.setDefaultClientRegistrationId("clientId");

    final int size = 16 * 1024 * 1024;
    final ExchangeStrategies strategies = buildClientWithExtendedResponseSize(size);

    return WebClient.builder()
        .defaultHeader("Accept", MediaType.APPLICATION_JSON_VALUE)
        .defaultHeader("subscription-key", someKeyValue)
        .clientConnector(createWiretappedClientHttpConnector(this.getClass()))
        .filter(oauth)
        .exchangeStrategies(strategies)
        .build();
  }

  private static ExchangeStrategies buildClientWithExtendedResponseSize(int size) {
    return ExchangeStrategies.builder()
        .codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(size))
        .build();
  }

private ClientHttpConnector createWiretappedClientHttpConnector(Class<?> invokedClass) {
    HttpClient httpClient =
        HttpClient.create()
            .option(ChannelOption.SO_KEEPALIVE, true)
            .responseTimeout(Duration.ofMinutes(5))
            .doOnConnected(
                conn ->
                    conn.addHandlerLast(new ReadTimeoutHandler(5 * 60))
                        .addHandlerLast(new WriteTimeoutHandler(5 * 60)))
            .compress(true)
            .wiretap(
                invokedClass.getCanonicalName(), LogLevel.TRACE, AdvancedByteBufFormat.TEXTUAL);
    return new ReactorClientHttpConnector(httpClient);
  }

I've also tried to narrow this code down as much as possible so forgive me for any structural mistakes.

I've tested this against a local mocked instance of the API in wiremock and the issue still persists so that should rule out server configuration issues.

Can it be possible that my processing code is too slow and that the WebClients returned response is not consumed in time and the webclient decides to close the connection?


Solution

  • Sorry for the late reply. There is an issue about it raised on reactor's github page. This issue seems to be something like "the client sends [ACK] and no [ACK,FIN] and keeps the connection opened. So, the connection is not closed by the client and later is reused, resulting in this error".

    The solution should be setting a max-idle-time (The maximum time that this connection stays idle in the connection pool). The value suggested is what has the client server as keepAliveTimeout, but if this is confusing, try something small and play around. So the solution should look like this:

    ConnectionProvider connectionProviderWithMaxIdleTime = ConnectionProvider.builder("withMaxIdleTime")
                    .maxIdleTime(Duration.ofSeconds(120)) //if issue persists reduce this
                    .build();
    HttpClient httpClient = HttpClient.create(connectionProviderWithMaxIdleTime)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    {...the rest of your code...}
    

    Hope this helps!