So I'm working on a project that can dynamically create and add new listeners on Kafka brokers.
Basically each instance of the application watches for a prefix on etcd, such as /instance-1/tasks
and then a manager application rebalance workload through multiple instances and put new tasks into a etcd key that takes the prefix. For example the manager puts the following to etcd, key /instance-1/tasks/f348b447-012e-425d-893b-bca3086ebf67
value some-basic-config-for-that-task
.
When the instance detects a new task has been added under it's prefix, it goes to a config data store to find the kafka broker configuration and then creates new "listeners" to that kafka broker on each partition the configuration tells to.
The listener basically have an inifinite loop that can be interrupted by calling a close()
method that sets a flag that it should stop on the next iteration.
For example, if the configuration is bootstrapServers 1.1.1.1:9092 ; numberOfPartitions 3 then 3 listeners (i.e. 3 threads) will be created (because there are 3 partitions). The listener basically polls kafka at each X seconds.
Since the release of virtual threads on the last LTS version (Java 21), the team is working on migrating from actual platform threads (new Thread()) to Virtual Threads.
The listener code looks like this (a very reduced version):
protected void doStart() {
long hostConnectivityTestRate = 5000L;
while (!closeCalled) {
long currentTime = System.currentTimeMillis();
if (lastHostConnectivityTest < currentTime - hostConnectivityTestRate) {
ConnectivityTestResult res = testConnectivityWithHost();
this.lastHostConnectivityTest = currentTime;
this.couldConnectToHost = res.wasAbleToConnect();
this.reporter.report(res);
}
if (this.couldConnectToHost) {
ConsumerRecords <byte[], byte[]> records = kafkaConsumer.poll(Duration.ofSeconds(20));
// DO MORE WORK...
} else {
Thread.sleep(hostConnectivityTestRate); // prevents busy-spin loop
}
}
this.kafkaConsumer.close();
}
public void close() throws IOException {
this.closeCalled = true;
}
The doStart() method is ran inside a virtual thread (Thread.ofVirtual().name("virtual-thread-" + id).start(connector::doStart)
)
I was reading about Virtual Threads and ok, I understood the concept. It does have a underlying carrier thread so it can execute. Seems useful for a lot of high throughput scenarios.
Virtual threads are suitable for running tasks that spend most of the time blocked, often waiting for I/O operations to complete. However, they aren't intended for long-running CPU-intensive operations.
The part "they aren't intended for long-running CPU-intensive operation" got me. The listener is a long-running process that will only be stopped when the close method is called (the snippet I wrote down doesn't show it, but if an exception happens, it is reported, logged and then swallowed). I already had a discussion with some team members about it and some of them seem very sure about using virtual threads for this kind of task. I'm asking this question just because I want to be sure that there is nothing wrong with it.
Useful informations:
Each instance can have up to 50 listeners (and therefore 50 threads / virtual threads created dynamically)
At first, 2 instances are started and then horizontally scalled based on CPU usage and number of listeners
Is there any problem on using virtual threads like this? Thank you guys!
After reading the code from and debugging the KafkaConsumer#poll(Duration)
method, I came with the conclusion that there's no harm in executing this code backed by a Virtual Thread.
The KafkaConsumer#poll(Duration)
, in its essence, is just a loop that controls the interaction and polling intervals between the client and the broker.
When you call, for example, kafkaConsumer.poll(Duration.ofSeconds(20))
, the underlying code may send multiple poll requests to the broker in that 20 seconds time window, for example, it may do that if the fetch conditions haven't been satisfied (conditions that can be controlled by fetch.min.bytes
and fetch.max.wait.ms
consumer properties), meaning that if the duration is 20 seconds and your fetch.min.bytes
is, say, 300 bytes, and your fetch.max.wait.ms
is, say, 500 milliseconds, when you issue a kafkaConsumer.poll(Duration.ofSeconds(20))
and there are less than 300 bytes available for consuming, the fetch request is not satisfied and thus an empty response will be returned after 500ms have been passed; during this 20 seconds frame, the kafka consumer API will issue more poll requests to the broker until 20 seconds have passed since the initial call to KafkaConsumer#poll
or until the fetch request is satisfied (the broker returns content, which means 300 bytes were available at the time).
That being said, it is clear that the application will be hanging on a I/O operation (waiting for data in the socket, wether there are messages returned from the broker or not).
Maybe I misunderstood something, but, to me, it looks like a reasonable approach.
No, it's not a reasonable approach.
From the very same Oracle's doc on Virtual Threads:
As a rule of thumb, if your application never has 10,000 virtual threads or more, it is unlikely to benefit from virtual threads. Either it experiences too light a load to need better throughput, or you have not represented sufficiently many tasks to virtual threads.
We don't have 10k virtual threads and therefore we would not benefit from this feature. One use case I can see and we may introduce in our application soon is to process the messages using Virtual Threads (this process is not CPU-intensive), but we will do this only when the order does not matter, because if you delegate messages from the same topic to be processed in parallel, you lose control over the order you may want to process them.
Resources: