My program works with a faulty readonly PostgreSQL database. It uses Hibernate 6 to retrieve relatively big results from the DB. For that reason I call the Query
method getResultStream()
instead of getResultList()
to get uncached forward-only results.
When a DB fails it's always the JDBC executeQuery()
call and processing an already opened ResultSet never fails.
I wanted to add retries in my code, but the error happens only after I return the Result Stream to 3rd-party code because the Result Stream is deferred and only calls JDBC when somebody invokes a terminal operation on the stream:
org.hibernate.exception.LockAcquisitionException
caused by
org.postgresql.util.PSQLException
...
at org.postgresql.jdbc.PgPreparedStatement.executeQuery
...
at java.base/java.util.Iterator.forEachRemaining(Unknown Source)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Unknown Source)
...
at java.base/java.util.stream.ReferencePipeline.count(Unknown Source)
at com.thirdparty.Program.main
How can I force the early call to JDBC executeQuery()
and return the JPA Result Stream with unconsumed results?
Here's my code:
LOGGER.debug("JPQL (" + offset + "," + pagesize + "):\n" + jpqlQuery);
TypedQuery<Map<String, Object>> builder = queryBuilder(jpqlQuery);
return builder.setFirstResult(offset).setMaxResults(pagesize).getResultStream();
I created a custom Spliterator that reconnects and repeats the query when it fails to get the first result.
<T> Stream<T> doIt(String jpqlQuery, int offset, int pagesize) {
FailoverSpliterator<T> failoverSpliterator = new FailoverSpliterator<>(() -> {
TypedQuery<T> qb = queryBuilder(jpqlQuery);
return qb.setFirstResult(offset).setMaxResults(pagesize).getResultStream();
});
return StreamSupport
.stream(failoverSpliterator, false)
.onClose(failoverSpliterator::close);
}
public class FailoverSpliterator<T> implements Spliterator<T> {
final Supplier<Stream<T>> delegateSupplier;
Spliterator<T> confirmedDelegate;
Stream<T> lastStream;
public FailoverSpliterator(Supplier<Stream<T>> delegateSupplier) {
this.delegateSupplier = delegateSupplier;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (confirmedDelegate != null) {
return confirmedDelegate.tryAdvance(action);
}
if (action == null) {
throw new NullPointerException();
}
@SuppressWarnings("unchecked")
T[] holder = (T[]) new Object[1];
for (int i = 0;; i++) {
final Spliterator<T> tmp;
final boolean res;
try {
lastStream = delegateSupplier.get();
try {
tmp = lastStream.spliterator();
res = tmp.tryAdvance(x -> {
holder[0] = x;
});
} catch (Throwable /* NOSONAR */ e) {
lastStream = closeQuietly(lastStream, e);
throw e;
}
} catch (Exception e) {
exceptionalReconnect(e);
if (i >= MAX_FAILOVER_RETRIES) {
throw e;
}
LOGGER.error("Failed to get the first element of JPA Result Stream: " + e, e);
LOGGER.warn("retrying...");
continue;
}
confirmedDelegate = tmp;
if (res) {
action.accept(holder[0]);
}
return res;
}
}
public void close() {
close(null);
}
void close(Throwable consumer) {
lastStream = closeQuietly(lastStream, consumer);
}
@Override
public int characteristics() {
return 0;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public Spliterator<T> trySplit() {
return null;
}
}