apologies if this is a bit ranty. I've kind of hit a wall after several days with trying to understand and implement async concepts in my Quarkus app so would love some input.
A basic idea of the flow of the application:
I take in a stream of kafa messages. In the key to these messages is an endpoint to the relevant internal API that I'm calling in our network with a simple @GET
. (rarely I have to use a POST
and so actually need the payload, but we'll keep it simple for now). I have about 20 APIs that I call.
I have POJO classes that model the JSONs that are returned from these endpoints, and ideally, they get persisted into the appropriate Oracle table, after I modify/update them slightly.
I was able to make this entire flow work perfectly well with a straightforward transactional/blocking style, and was very pleased with it. The problem arose when I found that I could only get about 15,000 messages persisted per hour and that I'm really needing to hit several times more.
I get really confused because there's a lot of very similar-seeming things in the quarkus guides and it sort of runs together for me. I keep bouncing from the Hibernate-Reactive-Panache to the Mutiny-Primer to the Hibernate-ORM to the Getting-Started-Reactive guides on quarkus.io.
So here's the general architecture I've been employing, based on a long-gone colleague's old app. I know I'm pretty bad at this since I just started, so sorry if it's terrible to look at:
KafkaConsumer parses key, calls appropriate service. Service uses an injected RestClient to make the call to the API, build the object, and attempt to persist.
KafkaConsumer
:
package com.acme.processor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.acme.model.Message;
import com.acme.resource.KafkaKeyParser;
import com.acme.service.*;
import jakarta.enterprise.context.ApplicationScoped;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.kafka.Record;
import jakarta.inject.Inject;
import java.util.Map;
@Slf4j
@ApplicationScoped
public class KafkaMessageConsumer {
@Inject
AccountContactEmailService accountEmailService;
@Incoming("test")
public Uni<Void> consume(KafkaRecord<String, Message> record) {
//I parse my keys and this works fine...
if httpEndpoint.contains("relevantInfo"):
accountContactEmailService.storeId(key1);
...
return Uni.createFrom().voidItem();
I guess I've been trying to keep most logic and moving parts in servics: AccountContactEmailService
@ApplicationScoped
public class AccountContactEmailService {
@Inject
Mutiny.SessionFactory sessionFactory;
@RestClient
AccountContactEmailRestClient restClient;
public Uni<Void> storeId(string key) {
return restClient.getById(key)
.onItem().transformToUni(data -> {
AccountContactEmail accountContactEmail = data.getAccountContactEmail();
return sessionFactory.withTransaction(
(session, transaction) -> session.persist(accountContactEmail)
);
})
.onFailure().invoke(e -> {
throw new RuntimeException(e);
});
}
}
And my RestClient
:
@Path("/PrettySureMyPathingWorks")
@RegisterRestClient(configKey="rest1")
@ClientBasicAuth(username="user", password= "pass")
public interface AccountContactEmailRestClient {
@GET
Uni<AccountContactEmailResponse> getById(@QueryParam("id") String id);
And my entities are marked as entities and defined as extending PanacheEntity
, so I won't share them for sake of brevity, but maybe the problem is there! If anyone responds and needs those objects I'll be happy to share those too if they're needed, along with the pom/application properties.
Hopefully that made any sense at all, and appreciate anyone who has the inclination to help.
The method .storeId
is returning a Uni<Void>
that you are ignoring.
So the application ends without doing anything. That's why nothing happens.
It depends on your application, but the code should look more like this:
@Incoming("test")
public Uni<Void> consume(KafkaRecord<String, Message> record) {
//I parse my keys and this works fine...
if (httpEndpoint.contains("relevantInfo")) {
// You need to return the value here
return accountContactEmailService
.storeId(key1)
// You can chain more uni here, if you need to. For example: .chain( ... )
;
}
...
return Uni.createFrom().voidItem();