Kotlin coroutines let to cancel a coroutine:
suspend fun forgettingTheBirthDayRoutine() {
coroutineScope {
val workingJob = launch {
workingConsciousness()
}
launch {
delay(2000L)
workingJob.cancel()
workingJob.join()
logger.info("I forgot the birthday! Let's go to the mall!")
}
}
}
The first time the workingConsciousness
reaches a suspending point after the invocation of workingJob.cancel()
, it will be canceled, as will its children's coroutines.
Is it possible to achieve the same behavior using structured concurrency in Java (StructuredTaskScope
and its subclasses)?
The only known for me way to cancel a task in Java is to invoke thread.interrupt
for this thread. That's exactly the way java.util.concurrent.FutureTask
implements its cancel
method - it plainly attempts to interrupt the thread which executes its Runnable
/Callable
.
So, to attempt to cancel a StructuredTaskScope.Subtask
we just need to interrupt a thread it executes on (or is executed by). However, as the OP correctly said, such a thread is not accessible for StructuredTaskScope
users.
To make this thread accessible the ideal solution would be to implement (possibly via delegation) StructuredTaskScope.Subtask
interface, storing the thread reference and extending it with cancel
method. This is, however, impossible to do because this interface is sealed
.
To circumvent this obstacle and assuming that we could have a control over the Callable
argument, passed to StructuredTaskScope.fork
method, we could have the following delegation:
class CancellableCallable<V> implements Callable<V> {
private final Callable<V> delegatee;
private Thread thread;
public CancellableCallable(Callable<V> delegatee) {
this.delegatee = delegatee;
}
@Override
public V call() throws Exception {
thread = Thread.currentThread();
return delegatee.call();
}
public Thread getThread() {
return thread;
}
}
then, if the exact type of Callable
, passed to fork
method, was CancellableCallable
, it would be possible to cancel such a SubTask
:
((CancellableCallable<?>) subtask.task()).getThread().interrupt();
As a convenience we could have a subclass of StructuredTaskScope
(which is permitted and even advisable) that wraps the above in a cancel
method:
class CancellableTaskScope<T> extends StructuredTaskScope<T> {
public <U extends T> void cancel(Subtask<U> task) {
if (task.state() != Subtask.State.UNAVAILABLE)
throw new IllegalStateException("Task is not running");
final Callable<? extends U> callable = task.task();
if (callable instanceof CancellableCallable) {
((CancellableCallable<?>) callable).getThread().interrupt();
} else {
throw new IllegalArgumentException("Task is not constructed with CancellableCallable");
}
}
}
It is easy to see that the above does not provide atomic cancelling like FutureTask
does, so there could be an attempt to interrupt already terminated thread, which might be quite OK, as Thread.interrupt
's JavaDoc states: Interrupting a thread that is not alive need not have any effect.
The usage of the above might look like following:
try (CancellableTaskScope<String> scope = new CancellableTaskScope<>()) {
Subtask<String> subtask1 = scope.fork(new CancellableCallable<>(() -> {
try {
Thread.sleep(SOME_INTERVAL);
} catch (InterruptedException e) {
}
return "OK";
}));
Subtask<Integer> subtask2 = scope.fork(...);
scope.cancel(subtask1);
scope.join();
}
Notice that, as usual, the burden of reaction on the thread interruption is solely on the task itself, Callable
implementation. Native methods are known to ignore it. That's why the word attempt is used above intentionally. In tat respect, FutureTask
, is no different, of course.
EDIT in response to OP's request.
@Holger suggested to use FutureTask
for constructing a Callable
, passed to fork
method. This employs the full power of FutureTask
, atomically set cancellation flag, for example. Then the snippet above will look like this:
try (StructuredTaskScope<String> scope = new StructuredTaskScope<>()) {
FutureTask<String> task1 = new FutureTask<>( ()-> {
try {
Thread.sleep(SOME_INTERVAL);
} catch (InterruptedException e) {
}
return "OK";
});
Subtask<String> subtask1 = scope.fork( () -> {
task1.run();
return task1.get();
});
Subtask<Integer> subtask2 = scope.fork(...);
task1.cancel(...);
scope.join();
}