I'm dealing with an issue where I'm not able to surface any kind of error exception my server is disconnected from my client. I want my code to be able to detect that the client no longer has a connection with the server but nothing I'm trying is working. doOnError, doOnCancel, doFinally all are not able to surface anything.
@Override
public <T> Flux<T> select(Query query, Class<T> entityClass) throws DataAccessException
return client.post()
.uri("/query-stream")
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue("{\"sql\": \"" + sqlQuery + "\"}"))
.retrieve()
.bodyToFlux(String.class)
.skip(1)
.<T>handle((jsonString, sink) -> handleJsonResponse(jsonString,entityClass, sink))
.doOnError(this::handleError)
.doOnCancel(() -> System.out.println("Cancel connection"))
.doFinally(signalType -> {
if (signalType == SignalType.CANCEL) {
System.out.println("Cancel Signal");
}
});
}
ksqlDbEntityTemplate.select(null, String.class)
.subscribe(System.out::println,
error -> {
// Handle errors
error.printStackTrace();
latch.countDown(); // Release the latch in case of an error.
}, latch::countDown
);
Going with the TCP Keep alive mechanism seems to be the only options here which works for us. After the kernel's designated TCP Keep Alive time is exceeded, and exception is thrown and we can act off of that exception.