javathreadpoolexecutorjava-threadsjava-21virtual-threads

Create separate thread Pools for a pool of Platform Threads in java


I want to create pool of virtual threads running on separate pool of Java Threads.

Here's the architecture I am trying to create:

architecture

This is to enable me to create separate pools to run batch tasks in one JVM and take benefit of virtual threads with n:m mapping for each pool. So if i have 12 cores then i can create 2 ThreadPools of 6 Threads. Each pool will only execute a specific task. Each pool will have N number of VirtualThreads. So, here the mapping would be 2 pools of {N VirtualThreads -> 6 Platform Threads}.

TLDR, I want to limit the number of PlatformThreads that a pool of virtual threads can run on.

One thing i could think of is, create the thread pool and when passing in a runnable, inside the run method I could just create virtual threads but the not sure how practical is it and will i be getting the pool partition that i want. Another issue with this approach is, the virtual threads will be running in just one java thread, so no N:M mapping


Solution

  • Virtual threads were invented to avoid the bother you seem to be undertaking.

    And, virtual threads are explicitly documented as not being intended for pooling. Like facial tissues, grab a fresh one, use it, and dispose.

    Read the Java JEP, and watch videos with Ron Pressler, Alan Bateman, José Paumard, etc. to understand the purpose and nature of virtual thread technology.

    You said:

    My use case uses job dependent on each other (basically output of one job provides input to another in queue).

    … and:

    when passing in a runnable, inside the run method I could just create virtual threads

    Turn your thinking inside-out: Instead of creating virtual threads to run a bunch of related cascading tasks, create just one thread to do all the work in serial fashion.

    If you have a series of cascading tasks, each task picking up after the previous task‘s result, then simply write wrap all that work in a single Runnable/Callable. Execute that single combined task in a single new fresh virtual thread. Let that virtual thread run to completion.

    Let's devise a simple demo app. We have three tasks that feed into one another, TaskA, TaskB, and TaskC. They are housed together as AlphabetTask. The result is "ABC", each letter having been added by each subtask.

    class AlphabetTask implements Callable < String >
    {
        private final UUID id = UUID.randomUUID ( );
    
        @Override
        public String call ( ) throws Exception
        {
            System.out.println ( "Starting AlphabetTask " + this.id + " " + Instant.now ( ) );
            String a = new TaskA ( ).call ( );
            String b = new TaskB ( a ).call ( );
            String c = new TaskC ( b ).call ( );
            System.out.println ( "Ending AlphabetTask " + this.id + " Result: " + c + " " + Instant.now ( ) );
            return c;
        }
    }
    
    class TaskA implements Callable < String >
    {
        @Override
        public String call ( ) throws Exception
        {
            System.out.println ( "Running TaskA. " + Instant.now ( ) );
            Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
            return "A";
        }
    }
    
    class TaskB implements Callable < String >
    {
        private final String input;
    
        public TaskB ( final String input )
        {
            this.input = input;
        }
    
        @Override
        public String call ( ) throws Exception
        {
            System.out.println ( "Running TaskB. " + Instant.now ( ) );
            Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
            return this.input + "B";
        }
    }
    
    class TaskC implements Callable < String >
    {
        private final String input;
    
        public TaskC ( final String input )
        {
            this.input = input;
        }
    
        @Override
        public String call ( ) throws Exception
        {
            System.out.println ( "Running TaskC. " + Instant.now ( ) );
            Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
            return this.input + "C";
        }
    }
    

    We establish three instances of AlphabetTask.

    Collection < AlphabetTask > alphabetTasks =
            List.of (
                    new AlphabetTask ( ) ,
                    new AlphabetTask ( ) ,
                    new AlphabetTask ( )
            );
    

    We submit all of those instances to an executor service. For each of those three AlphabetTasks, the executor assigns a fresh new virtual thread. Within each virtual thread, each of our subtasks are called in sequential order.

    Notice that we can use try-with-resources syntax to automatically close our executor service if its tasks complete in under a day.

    List < Future < String > > futures = List.of ( );
    try (
            ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;
    )
    {
        try
        {
            futures = executorService.invokeAll ( alphabetTasks );
        } catch ( InterruptedException e )
        {
            throw new RuntimeException ( e );
        }
    }
    // The try-with-resources blocks here if executor service has any uncompleted tasks.
    futures.forEach ( stringFuture -> {
        try
        {
            System.out.println ( stringFuture.get ( ) );
        } catch ( InterruptedException | ExecutionException e )
        {
            throw new RuntimeException ( e );
        }
    } );
    
    

    Caveat: Virtual threads are appropriate to tasks where the code involves blocking, such as file I/O, network I/O, database calls, waiting on a lock, etc. Do not use virtual threads to run CPU-bound tasks such as video encoding (no blocking involved).

    When run:

    Starting AlphabetTask 3a594ed1-a76e-4927-83b1-2d6bc81f566c 2023-12-03T20:30:03.442091Z
    Starting AlphabetTask 12743216-8e42-4be1-bfc4-1893e08e58a7 2023-12-03T20:30:03.442091Z
    Starting AlphabetTask 94a4d5b9-3ed9-43d4-ba66-509380fa9f8b 2023-12-03T20:30:03.442091Z
    Running TaskA. 2023-12-03T20:30:03.452388Z
    Running TaskA. 2023-12-03T20:30:03.452392Z
    Running TaskA. 2023-12-03T20:30:03.452383Z
    Running TaskB. 2023-12-03T20:30:03.556780Z
    Running TaskB. 2023-12-03T20:30:03.687342Z
    Running TaskC. 2023-12-03T20:30:03.812744Z
    Running TaskB. 2023-12-03T20:30:04.108820Z
    Running TaskC. 2023-12-03T20:30:04.278596Z
    Ending AlphabetTask 94a4d5b9-3ed9-43d4-ba66-509380fa9f8b Result: ABC 2023-12-03T20:30:04.310085Z
    Running TaskC. 2023-12-03T20:30:04.360861Z
    Ending AlphabetTask 3a594ed1-a76e-4927-83b1-2d6bc81f566c Result: ABC 2023-12-03T20:30:04.624803Z
    Ending AlphabetTask 12743216-8e42-4be1-bfc4-1893e08e58a7 Result: ABC 2023-12-03T20:30:04.953132Z
    ABC
    ABC
    ABC
    

    Beware: Output on console may not appear in chronological order when calling System.out.println across threads. Include and inspect timestamps if you care about order.


    Here is all of that code for your copy-paste convenience, to be dropped into a single .java file.

    package work.basil.example.threading;
    
    import java.time.Duration;
    import java.time.Instant;
    import java.util.Collection;
    import java.util.List;
    import java.util.UUID;
    import java.util.concurrent.*;
    
    public class Subtasks
    {
        public static void main ( String[] args )
        {
            Subtasks app = new Subtasks ( );
            app.demo ( );
        }
    
        private void demo ( )
        {
            Collection < AlphabetTask > alphabetTasks =
                    List.of (
                            new AlphabetTask ( ) ,
                            new AlphabetTask ( ) ,
                            new AlphabetTask ( )
                    );
            List < Future < String > > futures = List.of ( );
            try (
                    ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;
            )
            {
                try
                {
                    futures = executorService.invokeAll ( alphabetTasks );
                } catch ( InterruptedException e )
                {
                    throw new RuntimeException ( e );
                }
            }
            // The try-with-resources blocks here if executor service has any uncompleted tasks.
            futures.forEach ( stringFuture -> {
                try
                {
                    System.out.println ( stringFuture.get ( ) );
                } catch ( InterruptedException | ExecutionException e )
                {
                    throw new RuntimeException ( e );
                }
            } );
        }
    }
    
    class AlphabetTask implements Callable < String >
    {
        private final UUID id = UUID.randomUUID ( );
    
        @Override
        public String call ( ) throws Exception
        {
            System.out.println ( "Starting AlphabetTask " + this.id + " " + Instant.now ( ) );
            String a = new TaskA ( ).call ( );
            String b = new TaskB ( a ).call ( );
            String c = new TaskC ( b ).call ( );
            System.out.println ( "Ending AlphabetTask " + this.id + " Result: " + c + " " + Instant.now ( ) );
            return c;
        }
    }
    
    class TaskA implements Callable < String >
    {
        @Override
        public String call ( ) throws Exception
        {
            System.out.println ( "Running TaskA. " + Instant.now ( ) );
            Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
            return "A";
        }
    }
    
    class TaskB implements Callable < String >
    {
        private final String input;
    
        public TaskB ( final String input )
        {
            this.input = input;
        }
    
        @Override
        public String call ( ) throws Exception
        {
            System.out.println ( "Running TaskB. " + Instant.now ( ) );
            Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
            return this.input + "B";
        }
    }
    
    class TaskC implements Callable < String >
    {
        private final String input;
    
        public TaskC ( final String input )
        {
            this.input = input;
        }
    
        @Override
        public String call ( ) throws Exception
        {
            System.out.println ( "Running TaskC. " + Instant.now ( ) );
            Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
            return this.input + "C";
        }
    }
    

    You said:

    So if i have 12 cores then i can create 2 ThreadPools of 6 Threads.

    You do not have as much control as you seem to believe. Which platform-threads run on which core at which time for what duration is all up to the host OS. And the scheduling behavior varies moment by moment depending on the current loads on the computer. At any moment, none, few, or all of your threads may be executing.