javaconcurrencyjava-21virtual-threadsstructured-concurrency

Is it possible to cancel a forked subtask in a StructuredTaskScope


Kotlin coroutines let to cancel a coroutine:

suspend fun forgettingTheBirthDayRoutine() {
  coroutineScope {
    val workingJob = launch {
      workingConsciousness()
    }
    launch {
      delay(2000L)
      workingJob.cancel()
      workingJob.join()
      logger.info("I forgot the birthday! Let's go to the mall!")
    }
  }
}

The first time the workingConsciousness reaches a suspending point after the invocation of workingJob.cancel(), it will be canceled, as will its children's coroutines.

Is it possible to achieve the same behavior using structured concurrency in Java (StructuredTaskScope and its subclasses)?


Solution

  • The only known for me way to cancel a task in Java is to invoke thread.interrupt for this thread. That's exactly the way java.util.concurrent.FutureTask implements its cancel method - it plainly attempts to interrupt the thread which executes its Runnable/Callable.

    So, to attempt to cancel a StructuredTaskScope.Subtask we just need to interrupt a thread it executes on (or is executed by). However, as the OP correctly said, such a thread is not accessible for StructuredTaskScope users.

    To make this thread accessible the ideal solution would be to implement (possibly via delegation) StructuredTaskScope.Subtask interface, storing the thread reference and extending it with cancel method. This is, however, impossible to do because this interface is sealed.

    To circumvent this obstacle and assuming that we could have a control over the Callable argument, passed to StructuredTaskScope.fork method, we could have the following delegation:

    class CancellableCallable<V> implements Callable<V> {
    
        private final Callable<V> delegatee;
        private Thread thread;
    
        public CancellableCallable(Callable<V> delegatee) {
            this.delegatee = delegatee;
        }
    
        @Override
        public V call() throws Exception {
            thread = Thread.currentThread();
            return delegatee.call();
        }
    
        public Thread getThread() {
            return thread;
        }
    
    }
    

    then, if the exact type of Callable, passed to fork method, was CancellableCallable, it would be possible to cancel such a SubTask:

    ((CancellableCallable<?>) subtask.task()).getThread().interrupt();
    

    As a convenience we could have a subclass of StructuredTaskScope (which is permitted and even advisable) that wraps the above in a cancel method:

    class CancellableTaskScope<T> extends StructuredTaskScope<T> {
    
        public <U extends T> void cancel(Subtask<U> task) {
            if (task.state() != Subtask.State.UNAVAILABLE) 
               throw new IllegalStateException("Task is not running");
            final Callable<? extends U> callable = task.task();
            if (callable instanceof CancellableCallable) {
                ((CancellableCallable<?>) callable).getThread().interrupt();
            } else {
                throw new IllegalArgumentException("Task is not constructed with CancellableCallable");
            }
        }
    
    }
    

    It is easy to see that the above does not provide atomic cancelling like FutureTask does, so there could be an attempt to interrupt already terminated thread, which might be quite OK, as Thread.interrupt's JavaDoc states: Interrupting a thread that is not alive need not have any effect.

    The usage of the above might look like following:

    try (CancellableTaskScope<String> scope = new CancellableTaskScope<>()) {
        Subtask<String> subtask1 = scope.fork(new CancellableCallable<>(() -> {
            try {
                Thread.sleep(SOME_INTERVAL);
            } catch (InterruptedException e) {
            }
            return "OK";
        })); 
        Subtask<Integer> subtask2 = scope.fork(...);            
        scope.cancel(subtask1);
        scope.join(); 
    } 
    

    Notice that, as usual, the burden of reaction on the thread interruption is solely on the task itself, Callable implementation. Native methods are known to ignore it. That's why the word attempt is used above intentionally. In tat respect, FutureTask, is no different, of course.

    EDIT in response to OP's request.

    @Holger suggested to use FutureTask for constructing a Callable, passed to fork method. This employs the full power of FutureTask, atomically set cancellation flag, for example. Then the snippet above will look like this:

    try (StructuredTaskScope<String> scope = new StructuredTaskScope<>()) {
        FutureTask<String> task1 = new FutureTask<>( ()-> {
            try {
                Thread.sleep(SOME_INTERVAL);
            } catch (InterruptedException e) {
            }
            return "OK";
        }); 
        Subtask<String> subtask1 = scope.fork( () -> {
            task1.run(); 
            return task1.get();
        }); 
        Subtask<Integer> subtask2 = scope.fork(...);            
        task1.cancel(...);
        scope.join(); 
    }