resteasyquarkusmutiny

How to build a async rest endpoint that calls blocking action in worker thread and replies instantly (Quarkus)


I checked the docs and stackoverflow but didn't find exactly a suiting approach. E.g. this post seems very close: Dispatch a blocking service in a Reactive REST GET endpoint with Quarkus/Mutiny However, I don't want so much unneccessary boilerplate code in my service, at best, no service code change at all.

I generally just want to call a service method which uses entity manager and thus is a blocking action, however, want to return a string to the caller immidiately like "query started" or something. I don't need a callback object, it's just a fire and forget approach.

I tried something like this

@NonBlocking
@POST
@Produces(MediaType.TEXT_PLAIN)
@Path("/query")
public Uni<String> triggerQuery() {
    return Uni.createFrom()
    .item("query started")
    .call(() -> service.startLongRunningQuery());
}

But it's not working -> Error message returned to the caller:

You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.",

I actually expected quarkus takes care to distribute the tasks accordingly, that is, rest call to io thread and blocking entity manager operations to worker thread. So I must using it wrong.

UPDATE:

Also tried an proposed workaround that I found in https://github.com/quarkusio/quarkus/issues/11535 changing the method body to

return Uni.createFrom()
        .item("query started")
        .emitOn(Infrastructure.getDefaultWorkerPool())
        .invoke(()-> service.startLongRunningQuery());

Now I don't get an error, but service.startLongRunningQuery() is not invoked, thus no logs and no query is actually sent to db.

Same with (How to call long running blocking void returning method with Mutiny reactive programming?):

return Uni.createFrom()
            .item(() ->service.startLongRunningQuery()) 
            .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())

Same with (How to run blocking codes on another thread and make http request return immediately):

ExecutorService executor = Executors.newFixedThreadPool(10, r -> new Thread(r, "CUSTOM_THREAD"));

return Uni.createFrom()
                .item(() -> service.startLongRunningQuery())
                .runSubscriptionOn(executor);

Any idea why service.startLongRunningQuery() is not called at all and how to achieve fire and forget behaviour, assuming rest call handled via IO thread and service call handled by worker thread?


Solution

  • It depends if you want to return immediately (before your startLongRunningQuery operation is effectively executed), or if you want to wait until the operation completes.

    If the first case, use something like:

    @Inject EventBus bus;
    
    @NonBlocking
    @POST
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/query")
    public void triggerQuery() {
        bus.send("some-address", "my payload");
    }
    
    @Blocking // Will be called on a worker thread
    @ConsumeEvent("some-address")
    public void executeQuery(String payload) {
        service.startLongRunningQuery();
    }
    

    In the second case, you need to execute the query on a worker thread.

    @POST
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/query")
    public Uni<String> triggerQuery() {
       return Uni.createFrom(() -> service.startLongRunningQuery())
          .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
    }
    

    Note that you need RESTEasy Reactive for this to work (and not classic RESTEasy). If you use classic RESTEasy, you would need the quarkus-resteasy-mutiny extension (but I would recommend using RESTEasy Reactive, it will be way more efficient).