javamultithreadingjava-threadsvirtual-threads

Can Virtual Threads improve querying a database in Java?


I wanted to try power of virtual threads in Java in a simple application which consists of many tasks. Each task executes a query agains a database which takes around 10 seconds.

My expectation was that the queries are executed almost at the same time because the significant part of task is basically waiting for the response.

But it doesn't work like that. Unfortunately, I am missing probably something.

In order to execute the tasks, I am using:

ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()

the tasks are executed in a the following way:

StopWatch stopWatch = StopWatch.createStarted();
int numberOfTasks = 10;
List<? extends Future<String>> futures;
try(ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()) {
     futures = IntStream.range(1, numberOfTasks + 1).mapToObj(i -> new Task(i)).map(executorService::submit).toList();
}
        
for(Future<String> future: futures) {
            future.get();
}
stopWatch.stop();
System.out.println(format("The total time of execution was: {0} ms", stopWatch.getTime(TimeUnit.MILLISECONDS)));

The Task.call() method looks like this:

    @Override
    public String call() {
        System.out.println(format("Task: {0} started", taskId));
        StopWatch stopWatch = StopWatch.createStarted();
        Connection connection = null;
        String result = null;
        try {
            connection = DriverManager.getConnection("jdbc:mysql://localhost/sakila?user=sakila&password=sakila");
            System.out.println(format("Task: {0} connection established", taskId));
            var statement = connection.createStatement();
            System.out.println(format("Task: {0} executes SQL statement", taskId));
            ResultSet resultSet = statement.executeQuery("SELECT hello_world() AS output");
            while (resultSet.next()) {
                result = resultSet.getString("output");
            }
            statement.close();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            try {
                if (connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
            System.out.println(format("Task: {0} connection closed", taskId));
        }
        stopWatch.stop();
        System.out.println(format("Task: {0} completed in {1} ms", taskId, stopWatch.getTime(TimeUnit.MILLISECONDS)));
        return result;
    }

The output is as follows:

Task: 1 started
Task: 5 started
Task: 9 started
Task: 7 started
Task: 3 started
Task: 6 started
Task: 8 started
Task: 2 started
Task: 4 started
Task: 10 started
Task: 1 connection established
Task: 6 connection established
Task: 7 connection established
Task: 9 connection established
Task: 8 connection established
Task: 5 connection established
Task: 7 executes SQL statement
Task: 2 connection established
Task: 1 executes SQL statement
Task: 6 executes SQL statement
Task: 3 connection established
Task: 8 executes SQL statement
Task: 2 executes SQL statement
Task: 5 executes SQL statement
Task: 4 connection established
Task: 4 executes SQL statement
Task: 10 connection established
Task: 10 executes SQL statement
Task: 10 connection closed
Task: 6 connection closed
Task: 10 completed in 10 319 ms
Task: 8 connection closed
Task: 2 connection closed
Task: 2 completed in 10 335 ms
Task: 1 connection closed
Task: 9 executes SQL statement
Task: 3 executes SQL statement
Task: 1 completed in 10 337 ms
Task: 4 connection closed
Task: 4 completed in 10 320 ms
Task: 5 connection closed
Task: 8 completed in 10 336 ms
Task: 6 completed in 10 336 ms
Task: 7 connection closed
Task: 5 completed in 10 338 ms
Task: 7 completed in 10 338 ms
Task: 9 connection closed
Task: 3 connection closed
Task: 9 completed in 20 345 ms
Task: 3 completed in 20 345 ms
The total time of execution was: 20 363 ms

Summary:

  1. In the beginning all Tasks were started.
  2. Secondly, all tasks established a jdbc connection with a database
  3. Only 8 out of 10 tasks started to execute a SELECT statement
  4. The last 2 tasks started to execute the SELECT statement when two task completed their job

Long story short: Since communication with a database is an I/O operation then the virtual threads should execute the SELECTs almost at the same time.

P.S. I have 8 cores CPU.

Thank you a lot for explanations.


Solution

  • Can VT improve performance?

    Yes and no, of course.

    VTs share a native thread, so they can't run instructions in parallel beyond the OS capability. But as you aim to have them yield (park) when waiting, it should have helped, IF they were not hooked on mysql internal synchronized blocks.

    Mysql driver 9.0.0+ (currently 9.1.0) has ReentrantLock(s) now and it should work as this enabled the virtual thread to go do something else.

    (I'll presume you don't have a limit of 8 connections to the DB. It's usually more like 100. You can assert that by temporarily adding a pause to your task after connecting. You can also use a 'sleep(delay)' in mysql, to make statements last longer to prove your tasks can make parallel statements, but you've got a nasty 10 seconds hard statement already. Perhaps there is a difference in the eye of mysqld, but with a 60 seconds pause, you'll have time to show processlist by hand now).

    If you are on linux, you can write a small java method to read the "/proc/thread-self/status" on each task to get some outsight from the OS (the 'Pid' row in particular). See http://man.he.net/man5/proc . This would prove on which distinct native OS threads your VTs are running.

    For Windoze I used JNA:

        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
            <version>5.15.0</version>
        </dependency>
        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna-platform</artifactId>
            <version>5.15.0</version>
        </dependency>
    

    Here is a sample test:

    package vttests;
    
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.IntStream;
    
    import com.mysql.cj.jdbc.MysqlDataSource;
    import com.sun.jna.Library;
    import com.sun.jna.Native;
    
    public class TestMysqlWithVirtualThreads {
        record TD(Integer pid, Integer tid) {}
        
        public interface MyKernel32 extends Library  {
            int GetCurrentProcessId();
            int GetCurrentThreadId();
        }
        
        static MyKernel32 K32;
        static {
            try {
                K32 = Native.load("kernel32", MyKernel32.class);
            } catch (Throwable t) {
                t.printStackTrace(System.out);
            }
        }
        
        static ThreadLocal<TD> thrDetailsTL = ThreadLocal.withInitial(() -> new TD(pid(), tid()));
        
        static Integer pid() {
            return K32 != null ? K32.GetCurrentProcessId() : null;
        }
        
        static Integer tid() {
            return K32 != null ? K32.GetCurrentThreadId() : null;
        }
        
        static void p(Object msg) {
            TD td = thrDetailsTL.get();
            System.out.println("["+ (Thread.currentThread().isVirtual() ? "V" : "P")+ "/"+ Thread.currentThread().getName()+ "(" + td.pid + "." + td.tid + ")]: "+ msg);
        }
        
        public static void main(String[] args) throws Exception {
            testVirtualThread(false);
            testVirtualThread(true);
        }
        
        static void testVirtualThread(boolean useVT) throws InterruptedException, ExecutionException {
            int cores = Runtime.getRuntime().availableProcessors();
            p("\n\n==========================\nprocessors count = " + cores);
            
            int N = 3 * cores;
            int delay = 2000;
            long t0;
            long d;
            
            ExecutorService es = useVT ? Executors.newVirtualThreadPerTaskExecutor() : Executors.newThreadPerTaskExecutor(r -> new Thread(r));
            try (es) {
                t0 = System.nanoTime();
                List<? extends Future<?>> futures = IntStream
                    .range(1, N)
                    .mapToObj(i -> new T(i, delay))
                    .map(es::submit)
                    .toList();
                
                //--------
                
                p("waiting for all futures...");
                for(Future<?> future: futures) {
                            future.get();
                }
                d = System.nanoTime()-t0;
                p("waiting futures took "+1e-9*d+" sec");
                
                //--------
                
                p("shutting down es and await termination...");
                t0 = System.nanoTime();
                es.shutdown();
                es.awaitTermination(1, TimeUnit.SECONDS);
                d = System.nanoTime()-t0;
                p("shut down took "+1e-9*d+" sec");
                
                //--------
                
                p("closing VirtualThreadPerTaskExecutor...");
                t0 = System.nanoTime();
            }
            
            d = System.nanoTime() - t0;
            p("closing executor took " + 1e-9 * d + " sec");
            
            p("The end.");
        }
        
        static class T implements Runnable {
            int id;
            int delayms;
            String indent;
            
            T(int id, int delayms) {
                this.id = id;
                this.delayms = delayms;
                indent = "\t".repeat(id) +"\\_>";
            }
            
            @Override
            public void run() {
                p(indent + "task " + id + " started, pid=" + pid() + ", tid=" + tid());
                work();
                p(indent + "tast " + id + " ended.");
            }
            
            void work() {
                try {
                    MysqlDataSource ds = new MysqlDataSource();
                    ds.setServerName("localhost");
                    ds.setPort(3306);
                    ds.setUser("some-dbUser");
                    ds.setPassword("some-dbPass");
                    try (Connection c = ds.getConnection();
                        PreparedStatement ps = c.prepareStatement("select sleep(?), connection_id()")
                        ) {
                        ps.setDouble(1, 1e-3*delayms);
                        
                        for(int i=1; i<=3; i++) {
                            p(indent + "call "+i+"starting ... ");
                            ResultSet rs = ps.executeQuery();
                            p(indent + "call "+i+" finished on connection id "+(rs.next() ? rs.getString(2) : null));
                        }
                    }
                    
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    What I see here (with 4 cores) is 4 native threads despite creating 12 virtual threads. With driver 8.4, it's slow like yours, with 9.1.0 it behaves as expected.

    And now that you restored sanity, the questions remaind: did it help? The answer has more to do with how many threads can you afford.

    Would you rather use 20 native threads (more memory) with only 20 connections (less memory) guaranteed to work in parallel, or would you rather have 100 connections (more memory) which are sadly never working on more than the 4 native threads shared by the virtual threads (less memory)?.

    Understand, you really only have N cores at work but my point is that one's optimization is often another one's problem... In particular, allowing 20000 virtual threads to share 4 cores will be awesome on the client side, but you'll never get 20000 jdbc connections without severe performance issues.

    As always, bring the back pressure at the entry of the system, don't overcommit to work upsteam only to have to deal with the problem downstream.

    As a client in most protocols, the number of outgoing connections is likely lower than the number of threads you can tolerate being blocked for a responses. For a server, it's another story. Virtual threads help the servicing side just like async NIO did.