I'm trying to implement a thread-per-request design using Vert.x Web's HTTP server. When I run the following code, I am not able to process any more requests until response.end()
is called:
final var server = vertx.createHttpServer(serverOptions).requestHandler(req -> {
// workerPool is a thread pool with 10 threads
workerPool.execute {
var response = req.response();
longRunningTask();
response.end("Done.")
}
})
I confirmed the culprit was response.end()
by changing my code to the following:
final var server = vertx.createHttpServer(serverOptions).requestHandler(req -> {
// workerPool is a thread pool with 10 threads
var response = req.response();
workerPool.execute {
longRunningTask();
}
response.end("Done.")
})
But this isn't thread-per-request as I'm just offloading longRunningTask()
to a background thread, and quickly dealing with requests on the main thread.
I also tried:
Vertx#executeBlocking()
workerPoolSize
to 5eventLoopPoolSize
to 5and I got a similar result.
I'm on Java 17 and I'm using Vert.x Web 4.4.1 where it seems setting virtual threads and setting Vert.x's threading model are not available. Any idea how to work around this?
Assuming the workerPool
in your code is an instance of ExecutorService
. That would be a completely different thread scheduler than Vert.x. Mixing both schedulers (Vert.x, ExecutorService
) will almost certainly lead you into catching concurrency bugs.
For the same reason and because it would be wasteful, you should not try to build a thread-per-request server on top of Vert.x.
If it's only about avoiding Vert.x' Future
... your options on JVM 17 are using Kotlin with Coroutines or one of the reactive libraries like RxJava or Mutiny. However, these will also not turn async code magically into sync code. Imho, Vert.x Future
is a nice abstraction of what is actually going on. I'll stick with it.
Let's look at how to get your longRunningTask()
to work in a Vert.x application. There are three ways to make Vert.x return an HTTP response after longRunningTask()
has finished:
vertx.executeBlocking()
You can offload any Callable
to Vert.x' worker thread pool:
httpServer.requestHandler(req ->
vertx
.executeBlocking(() -> longRunningTask()) // Future<Void>
.onSuccess(__ -> req.response().end("Done."))
);
WorkerExecutor
You can create a separate shared worker pool that is compatible with Vert.x. The name
that you assign to it identifies the worker pool in your application, so you can grab it by that name from other places as well:
httpServer.requestHandler(req ->
vertx
.createSharedWorkerExecutor("slow-tasks", 10) // 10 threads
.executeBlocking(() -> longRunningTask()) // Future<Void>
.onSuccess(__ -> req.response().end("Done."))
);
Worker Verticles can make sense when you want to share a blocking operation between all Verticles or even in a cluster. They are programmed just like normal Verticles. However, they are deployed with DeploymentOptions().setWorker(true)
.
This could be your Worker Verticle:
public final class LongRunningTaskVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
vertx
.eventBus()
.consumer("org.me.longrunning") // event bus address
.handler(msg -> {
longRunningTask();
msg.reply("ok");
});
}
}
Calling the Worker Verticle from an HTTP request handler is like:
httpServer.requestHandler(req ->
vertx
.eventBus()
.<String> request("org.me.longrunning", "") // Future<Message<String>>
.onSuccess(__ -> req.response().end("Done."))
);