springspring-webfluxproject-reactorserver-sent-eventsretrywhen

Spring WebFlux SSE connection, update header on retry


I establish a ServerSentEvent connection using the code below. The GET request must have a "Last-Event-ID" HTTP Header set that indicates the last fully processed event. Occasionally the Server can close the connection, upon which I need to retry it. On each retry however I need to update the value of the "Last-Event-ID" header.

The code below works, including the retries. But it does not update the value of "Last-Event-ID" header before retrying. It always uses the value that was assigned initially.

Can anybody point me in the right direction on how to implement this correctly?

var type = new ParameterizedTypeReference<ServerSentEvent<String>>() {};

var req = webClient.get()
    .uri("/the/end/point");

String lastEventId = loadLastId();
if (lastEventId != null && !lastEventId.isEmpty()) {
    req = req.header("Last-Event-ID", lastEventId);
}

req
    .retrieve()
    .bodyToFlux(type)
    .retryWhen(Retry
        .backoff(
            maxRetries,
            Duration.ofSeconds(retryBackoffStep)
        )
        .maxBackoff(Duration.ofSeconds(10))
        .transientErrors(true)
        .filter(t -> {
            log.warn("SSE Notification Channel: connection error detected", t);
            return !(t instanceof ClientAuthorizationException)
                && !(t instanceof WebClientResponseException.Forbidden)
                && !(t instanceof WebClientResponseException.NotFound);
        })
        .doBeforeRetry(signal -> {
            // How can I do the following?
            // req.header("Last-Event-ID", loadLastId());
            log.warn("SSE Notification Channel: retry SSE connection, attempt {}",
                            signal.totalRetriesInARow() + 1)
        })
        .doAfterRetry(signal ->
            log.warn("SSE Notification Channel: retried SSE connection, attempt {}, successful = {}",
                            signal.totalRetriesInARow() + 1, signal.failure() == null))
        .onRetryExhaustedThrow((retryBackoffSpec, signal) -> {
            log.error("SSE Notification Channel: SSE connection retries exhausted");
            return signal.failure();
        })
    )
    .subscribe(
        this::handleContent,
        this::handleError,
        () -> log.error("SSE Notification Channel: SSE connection closed")
    );

Solution

  • Once a request is sent, you cannot modify it.

    In order to send a modified request (e.g. set a new header, change request body), you need to create a new request.

    Flux.defer() defers (delays) a request's creation until it is sent.
    When request has to be retried (resent), a supplier passed to Flux.defer() creates a new request with new header.

    Flux<String> requestFlux = Flux.defer(() -> 
        var req = webClient.get()
            .uri("/the/end/point");
        
        String lastEventId = loadLastId();
        if (lastEventId != null && !lastEventId.isEmpty()) {
            req = req.header("Last-Event-ID", lastEventId);
        }
    
        req.retrieve().bodyToFlux(type);
    );
    
    requestFlux
        .retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(retryBackoffStep))