I am working on a java consumer for ksqlDB, that uses PULL and PUSH queries.
Currently, a single push query may be streamed for a maximum of ten minutes at a time, after which the server closes the connection if consumer is idle. Even if there is a change in the streams or tables, we cannot receive these changes because of connection timeout. Is there any way to keep this connection alive continuously?
One workaround is to ping ksqlDB server every few minutes. but it will obviously hold server's resources unnecessarily. https://github.com/confluentinc/ksql/issues/6970
or else one can write simple onError method using Reactive Streams Subscriber (for more details refer to, https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/), in which the re-subscription logic can be written. So that if next change in the ksqlDB server occurs even after few hours, timeout exception will be caught by onError method and then we can re-subscribe to the push queries.
@Override
public synchronized void onError(Throwable t) {
logger.error("Received an error in onError method : " + t);
getPushQueryResult();
}
public void getPushQueryResult() {
try {
client.streamQuery("query").thenAccept(streamedQueryResult -> {
logger.info("Query has started. Query ID: " + streamedQueryResult.queryID());
RowSubscriber subscriber = new RowSubscriber();
streamedQueryResult.subscribe(subscriber);
}).exceptionally(e -> {
logger.error("Request failed : " + e);
return null;
});
} catch (Exception e) {
logger.error("Exception in getPushQueryResult method : " + e);
}
}