vert.x

How do I implement a thread-per-request model using Vert.x Web's HTTP server on Vert.x 4.4.1?


I'm trying to implement a thread-per-request design using Vert.x Web's HTTP server. When I run the following code, I am not able to process any more requests until response.end() is called:

final var server = vertx.createHttpServer(serverOptions).requestHandler(req -> {
            // workerPool is a thread pool with 10 threads
            workerPool.execute {
              var response = req.response();
              longRunningTask();
              response.end("Done.")
            }
    })

I confirmed the culprit was response.end() by changing my code to the following:

final var server = vertx.createHttpServer(serverOptions).requestHandler(req -> {
                // workerPool is a thread pool with 10 threads
                var response = req.response();
                workerPool.execute {
                  longRunningTask();
                }
                response.end("Done.")
        })

But this isn't thread-per-request as I'm just offloading longRunningTask() to a background thread, and quickly dealing with requests on the main thread.

I also tried:

and I got a similar result.

I'm on Java 17 and I'm using Vert.x Web 4.4.1 where it seems setting virtual threads and setting Vert.x's threading model are not available. Any idea how to work around this?


Solution

  • Assuming the workerPool in your code is an instance of ExecutorService. That would be a completely different thread scheduler than Vert.x. Mixing both schedulers (Vert.x, ExecutorService) will almost certainly lead you into catching concurrency bugs.

    For the same reason and because it would be wasteful, you should not try to build a thread-per-request server on top of Vert.x.

    If it's only about avoiding Vert.x' Future... your options on JVM 17 are using Kotlin with Coroutines or one of the reactive libraries like RxJava or Mutiny. However, these will also not turn async code magically into sync code. Imho, Vert.x Future is a nice abstraction of what is actually going on. I'll stick with it.

    Let's look at how to get your longRunningTask() to work in a Vert.x application. There are three ways to make Vert.x return an HTTP response after longRunningTask() has finished:

    Option 1: vertx.executeBlocking()

    You can offload any Callable to Vert.x' worker thread pool:

    httpServer.requestHandler(req ->
        vertx
            .executeBlocking(() -> longRunningTask()) // Future<Void>
            .onSuccess(__ -> req.response().end("Done."))
    );
    

    Option 2: Use a shared WorkerExecutor

    You can create a separate shared worker pool that is compatible with Vert.x. The name that you assign to it identifies the worker pool in your application, so you can grab it by that name from other places as well:

    httpServer.requestHandler(req ->
        vertx
            .createSharedWorkerExecutor("slow-tasks", 10) // 10 threads
            .executeBlocking(() -> longRunningTask())     // Future<Void>
            .onSuccess(__ -> req.response().end("Done."))
    );
    

    Option 3: Use a Worker Verticle

    Worker Verticles can make sense when you want to share a blocking operation between all Verticles or even in a cluster. They are programmed just like normal Verticles. However, they are deployed with DeploymentOptions().setWorker(true).

    Worker Verticle

    This could be your Worker Verticle:

    public final class LongRunningTaskVerticle extends AbstractVerticle {
        
        @Override
        public void start(Promise<Void> startPromise) {
            vertx
                .eventBus()
                .consumer("org.me.longrunning") // event bus address
                .handler(msg -> {
                    longRunningTask();
                    msg.reply("ok");
                });
        }
    }
    

    Call it from an HTTP request handler

    Calling the Worker Verticle from an HTTP request handler is like:

    httpServer.requestHandler(req ->
        vertx
            .eventBus()
            .<String> request("org.me.longrunning", "")   // Future<Message<String>>
            .onSuccess(__ -> req.response().end("Done."))
    );