I have run into an issue when using the following technologies.
And obviously, a litany of other things. I can provide a POM on request, but I don't think it'll be helpful. My postgres driver is UTD, as is all my versions.
The core issue is when running concurrent operations at high speeds, I am running into IO issues with HikariCp, when trying to save items to my database. I'm not totally sure what to do.
In my microservice's logs:
2024-09-19T18:50:51.617-04:00 WARN 10060 --- [virtual-ingest] [ virtual-342] com.zaxxer.hikari.pool.ProxyConnection : HikariPool-1 - Connection org.postgresql.jdbc.PgConnection@28b26c3f marked as broken because of SQLSTATE(08006), ErrorCode(0)
org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
**In Docker Logs for PG (repeated, constantly): **
2024-09-19 22:50:51.807 UTC [412] LOG: unexpected EOF on client connection with an open transaction
My code (I haven't gone back to clean this up yet):
@EventListener(ApplicationReadyEvent.class)
public void startStructures()
{
try (var outerScope = new StructuredTaskScope.ShutdownOnFailure())
{
sqsConfig.getQueues().forEach(queue -> outerScope.fork(() -> {
processQueueMessages(queue);
return null;
}));
outerScope.join();
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.error("Processing interrupted", e);
}
}
private void processQueueMessages(String queue)
{
while (true)
{
try (var innerScope = new StructuredTaskScope.ShutdownOnFailure())
{
List<Message> messages = retrieveMessages(queue);
if (messages.isEmpty())
{
log.info("No messages in queue: {}, sleeping...", queue);
Thread.sleep(5000);
}
else
{
for (Message message : messages)
{
innerScope.fork(() -> {
processMessage(queue, message);
return null;
});
}
innerScope.join();
}
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.error("Queue processing interrupted", e);
break;
}
}
}
public List<Message> retrieveMessages(String queue)
{
var url = awsConfig.getBaseUrl() + queue;
ReceiveMessageRequest request = ReceiveMessageRequest.builder()
.queueUrl(url)
.maxNumberOfMessages(10)
.waitTimeSeconds(
10)
.build();
ReceiveMessageResponse response = sqsClient.receiveMessage(request);
return response.messages();
}
private void processMessage(String queue, Message message) throws JsonProcessingException
{
log.info("Processing message from queue {}: {}", queue, message.messageId());
JsonNode node = nodeBuilderService.buildNode(message);
distributionService.handleSqsNotification(node);
deleteMessage(queue, message);
}
private void deleteMessage(String queue, Message message)
{
var url = awsConfig.getBaseUrl() + queue;
sqsClient.deleteMessage(builder -> builder.queueUrl(url).receiptHandle(message.receiptHandle()));
int remainingMessages = getMessageCount(queue);
log.info(
"Deleted message: {}. Approximate {} messages remaining in the queue.", message.messageId(),
remainingMessages
);
}
public int getMessageCount(String queueName)
{
var url = awsConfig.getBaseUrl() + queueName;
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
.queueUrl(url)
.attributeNames(
QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
.build();
return Integer.parseInt(
sqsClient.getQueueAttributes(request)
.attributes()
.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
);
}
In the processing of messages, my code eventually hits this method:
@Transactional
public void process(String filename, InputStream stream)
throws IOException
{
byte[] bytes = getBytesFromInputStream(stream, filename);
List<SurfaceObservation> observations = decoder.beginSynopticDecoders(bytes, filename);
repository.saveAllAndFlush(observations);
}
Which is where the error occurs.
I have tried forcing this onto a platform thread, but I believe this is related to HikariCP's lack of support for virtual threads, even in current versions. I watched JEP cafe on my scopes—and I'm wondering if anyone can help me answer a few questions.
I tried Platform threads, I tried a scope for just the DB transactions. I tried a lot of stuff for hours and have that code-brain where now I'm not even sure what I tried. I even tried to have chatGPT help me, and as expected, it could not.
I am marking this as answered as I am no longer experiencing the problem after implementing a Redis cache, but I do believe Hikari has issues in handling virtual-thread utilizing, highly concurrent applications and will watch to see if any better solution comes up.