springspring-webfluxserver-sent-eventsretrywhen

Spring WebFlux SSE connection, retries and updating header on each retry


I establish a ServerSentEvent connection using the code below. The GET request must have a "Last-Event-ID" HTTP Header set that indicates the last fully processed event. Occasionally the Server can close the connection, upon which I need to retry it. On each retry however I need to update the value of the "Last-Event-ID" header.

The code below works, including the retries. But it does not update the value of "Last-Event-ID" header before retrying. It always uses the value that was assigned initially.

Can anybody point me in the right direction on how to implement this correctly?

        ParameterizedTypeReference<ServerSentEvent<String>> type
                = new ParameterizedTypeReference<>() {
        };

        var req = webClient.get()
                .uri("/the/end/point");

        String lastEventId = loadLastId();
        if (lastEventId != null && !lastEventId.isEmpty()) {
            req = req.header("Last-Event-ID", lastEventId);
        }

        req
            .retrieve()
            .bodyToFlux(type)
            .retryWhen(Retry
                .backoff(
                    maxRetries,
                    Duration.ofSeconds(retryBackoffStep)
                )
                .maxBackoff(Duration.ofSeconds(10))
                .transientErrors(true)
                .filter(t -> {
                    log.warn("SSE Notification Channel: connection error detected", t);
                    return !(t instanceof ClientAuthorizationException)
                        && !(t instanceof WebClientResponseException.Forbidden)
                        && !(t instanceof WebClientResponseException.NotFound);
                })
                .doBeforeRetry(signal -> {
                    // How can I do the following?
                    // req.header("Last-Event-ID", loadLastId());
                    log.warn("SSE Notification Channel: retry SSE connection, attempt {}",
                                    signal.totalRetriesInARow() + 1)
                })
                .doAfterRetry(signal ->
                    log.warn("SSE Notification Channel: retried SSE connection, attempt {}, successful = {}",
                                    signal.totalRetriesInARow() + 1, signal.failure() == null))
                .onRetryExhaustedThrow((retryBackoffSpec, signal) -> {
                    log.error("SSE Notification Channel: SSE connection retries exhausted");
                    return signal.failure();
                })
            )
            .subscribe(
                this::handleContent,
                this::handleError,
                () -> log.error("SSE Notification Channel: SSE connection closed")
            );

Solution

  • Once a request is sent, you cannot modify it.

    In order to send a modified request (e.g. set a new header, change request body), you need to create a new request.

    Flux.defer() defers (delays) a request's creation until it is sent.
    When request has to be retried (resent), a supplier passed to Flux.defer() creates a new request with new header.

    Flux<String> requestFlux = Flux.defer(() -> 
        var req = webClient.get()
            .uri("/the/end/point");
        
        String lastEventId = loadLastId();
        if (lastEventId != null && !lastEventId.isEmpty()) {
            req = req.header("Last-Event-ID", lastEventId);
        }
    
        req.retrieve().bodyToFlux(type);
    );
    
    requestFlux
        .retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(retryBackoffStep))