I establish a ServerSentEvent connection using the code below. The GET request must have a "Last-Event-ID" HTTP Header set that indicates the last fully processed event. Occasionally the Server can close the connection, upon which I need to retry it. On each retry however I need to update the value of the "Last-Event-ID" header.
The code below works, including the retries. But it does not update the value of "Last-Event-ID" header before retrying. It always uses the value that was assigned initially.
Can anybody point me in the right direction on how to implement this correctly?
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<>() {
};
var req = webClient.get()
.uri("/the/end/point");
String lastEventId = loadLastId();
if (lastEventId != null && !lastEventId.isEmpty()) {
req = req.header("Last-Event-ID", lastEventId);
}
req
.retrieve()
.bodyToFlux(type)
.retryWhen(Retry
.backoff(
maxRetries,
Duration.ofSeconds(retryBackoffStep)
)
.maxBackoff(Duration.ofSeconds(10))
.transientErrors(true)
.filter(t -> {
log.warn("SSE Notification Channel: connection error detected", t);
return !(t instanceof ClientAuthorizationException)
&& !(t instanceof WebClientResponseException.Forbidden)
&& !(t instanceof WebClientResponseException.NotFound);
})
.doBeforeRetry(signal -> {
// How can I do the following?
// req.header("Last-Event-ID", loadLastId());
log.warn("SSE Notification Channel: retry SSE connection, attempt {}",
signal.totalRetriesInARow() + 1)
})
.doAfterRetry(signal ->
log.warn("SSE Notification Channel: retried SSE connection, attempt {}, successful = {}",
signal.totalRetriesInARow() + 1, signal.failure() == null))
.onRetryExhaustedThrow((retryBackoffSpec, signal) -> {
log.error("SSE Notification Channel: SSE connection retries exhausted");
return signal.failure();
})
)
.subscribe(
this::handleContent,
this::handleError,
() -> log.error("SSE Notification Channel: SSE connection closed")
);
Once a request is sent, you cannot modify it.
In order to send a modified request (e.g. set a new header, change request body), you need to create a new request.
Flux.defer()
defers (delays) a request's creation until it is sent.
When request has to be retried (resent), a supplier passed to Flux.defer()
creates a new request with new header.
Flux<String> requestFlux = Flux.defer(() ->
var req = webClient.get()
.uri("/the/end/point");
String lastEventId = loadLastId();
if (lastEventId != null && !lastEventId.isEmpty()) {
req = req.header("Last-Event-ID", lastEventId);
}
req.retrieve().bodyToFlux(type);
);
requestFlux
.retryWhen(Retry.backoff(maxRetries, Duration.ofSeconds(retryBackoffStep))