javafork-joinproject-loomstructured-concurrency

In Loom, can I use virtual threads for Recursive[Action/Task]?


Is it possible to use RecursiveAction, for example, in conjunction with -- instead of the fork/join pool -- a pool of virtual threads (before I attempt a poorly-designed, custom effort)?


Solution

  • RecursiveAction is a subclass of ForkJoinTask which is, as the name suggests and the documentation even says literally, an

    Abstract base class for tasks that run within a ForkJoinPool.

    While the ForkJoinPool can be customized with a thread factory, it’s not the standard thread factory, but a special factory for producing ForkJoinWorkerThread instances. Since these threads are subclasses of Thread, they can’t be created with the virtual thread factory.

    So, you can’t use RecursiveAction with virtual threads. The same applies to RecursiveTask. But it’s worth rethinking what using these classes with virtual threads would gain you.

    The main challenge, to implement decomposition of your task into sub-task is on you, anyway. What these classes provide you, are features specifically for dealing with the Fork/Join pool and balancing the workload with the available platform threads. When you want to perform each sub-task on its own virtual thread, you don’t need this. So you can easily implement a recursive task with virtual threads without the built-in classes, e.g.

    record PseudoTask(int from, int to) {
        public static CompletableFuture<Void> run(int from, int to) {
            return CompletableFuture.runAsync(
                new PseudoTask(from, to)::compute, Thread::startVirtualThread);
        }
    
        protected void compute() {
            int mid = (from + to) >>> 1;
            if(mid == from) {
                // simulate actual processing with potentially blocking operations
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
            }
            else {
                CompletableFuture<Void> sub1 = run(from, mid), sub2 = run(mid, to);
                sub1.join();
                sub2.join();
            }
        }
    }
    

    This example just doesn’t care about limiting the subdivision nor avoiding blocking join() calls and it still performs well when running, e.g. PseudoTask.run(0, 1_000).join(); You might notice that with larger ranges, the techniques known from the other recursive task implementations can be useful here too, where the sub-task is rather cheap.

    E.g., you may only submit one half of the range to another thread and process the other half locally, like

    record PseudoTask(int from, int to) {
        public static CompletableFuture<Void> run(int from, int to) {
            return CompletableFuture.runAsync(
                new PseudoTask(from, to)::compute, Thread::startVirtualThread);
        }
    
        protected void compute() {
            CompletableFuture<Void> f = null;
            for(int from = this.from, mid; ; from = mid) {
                mid = (from + to) >>> 1;
                if (mid == from) {
                    // simulate actual processing with potentially blocking operations
                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
                    break;
                } else {
                    CompletableFuture<Void> sub1 = run(from, mid);
                    if(f == null) f = sub1; else f = CompletableFuture.allOf(f, sub1);
                }
            }
            if(f != null) f.join();
        }
    }
    

    which makes a notable difference when running, e.g. PseudoTask.run(0, 1_000_000).join(); which will use only 1 million threads in the second example rather than 2 millions. But, of course, that’s a discussion on a different level than with platform threads where neither approach would work reasonably.


    Another upcoming option is the StructuredTaskScope which allows to spawn sub-tasks and wait for their completion

    record PseudoTask(int from, int to) {
        public static void run(int from, int to) {
            try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
                new PseudoTask(from, to).compute(scope);
                scope.join();
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }
    
        protected Void compute(StructuredTaskScope<Object> scope) {
            for(int from = this.from, mid; ; from = mid) {
                mid = (from + to) >>> 1;
                if (mid == from) {
                    // simulate actual processing with potentially blocking operations
                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
                    break;
                } else {
                    var sub = new PseudoTask(from, mid);
                    scope.fork(() -> sub.compute(scope));
                }
            }
            return null;
        }
    }
    

    Here, the tasks do not wait for the completion of their sub-task but only the root task waits for the completion of all tasks. But this feature is in incubator state, hence, may take even longer than the virtual threads feature, to become production-ready.