I'm in doubt if the solution below is correct for the following:
Below is the code used to solve this:
public class Processor {
private final ExecutorService execService = Executors.newSingleThreadExecutor();
private final Object monitor = new Object();
private AtomicReference<Task> runningTask = new AtomicReference<>(null);
public Optional<CompletableFuture<String>> processDataAsync(String data) {
if (runningTask.get() != null)
return Optional.empty(); //rejecting data process request because another request is already being served
synchronized (monitor) {
if (runningTask.get() != null)
return Optional.empty();
CompletableFuture<String> f = new CompletableFuture<>();
f.whenComplete((r, e) -> runningTask.set(null)); //when processing completes, another data process request can be accepted
Task task = new Task(f, data);
runningTask.set(task);
execService.submit(task);
return Optional.of(f);
}
}
}
Task
is Runnable
as below:
public class Task implements Runnable {
private final CompletableFuture<String> result;
private final String data;
public Task(CompletableFuture<String> result, String data) {
this.result = result;
this.data = data;
}
@Override
public void run() {
String processingResult = processData(data); //does some blocking stuff with data, returning result of processing
result.complete(processingResult);
}
}
What confuses me here is the synchronization (i.e. blocking) in processDataAsync
. I understand that blocking here is very short and not critical, but shouldn't asynchronous method be always implemented without blocking? If so, I can't imagine how "single processing" can be achieved without synchronization.
Perhaps I’ve misunderstood the problem, but it seems you are over complicating the situation. Rather than keeping track of the task, keep track of the Future
returned by ExecutorService#submit
. A Future
object is your tether leading back to the task being executed.
Define a member field for the Future
.
Future future ;
Test the Future
when request to process is made. Call Future#isDone
to test. Javadoc says:
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
if( Objects.isNull( this.future ) || this.future.isDone() ) {
… proceed with request to process data.
this.future = executorService.submit( … ) ;
return Optional.of( this.future ) ;
} else {
… Refuse the request to process data.
… Do *not* submit any task to the executor service.
return Optional.empty() ;
}
TaskMaster
solutionIn various comments, you presented more details of your problem.
You want to submit tasks from various threads to a single object. Let's call that object TaskMaster
for clarity. That TaskMaster
instance tracks whether its nested executor service is currently working on a task or not.
Optional< Future >
.Optional< Future >
.Since the code shown above discussed here will be accessed across threads, we must protect the Future future ;
in a thread-safe manner. One easy way to do that is to mark synchronized
on the one and only method for tendering a task to the TaskMaster
.
package singletask;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
// Runs one task at a time, rejecting any task tendered while already executing a task.
public class TaskMaster
{
// Member fields.
private final ExecutorService executorService;
private Future future;
// Constructor
public TaskMaster ( )
{
this.executorService = Executors.newSingleThreadExecutor();
}
public synchronized Optional < Future > tender ( Runnable task )
{
if ( Objects.isNull( this.future ) || this.future.isDone() )
{
// Proceed with immediate execution of the tendered task.
this.future = executorService.submit( task );
return Optional.of( this.future );
} else
{
// Already busy on a task. Reject this tendered task.
return Optional.empty();
}
}
public void shutdownAndAwaitTerminationOfExecutorService ( )
{
if ( Objects.isNull( this.executorService ) ) { return; }
this.executorService.shutdown(); // Stop new tasks from being submitted.
try
{
// Wait a while for existing tasks to terminate
if ( ! this.executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
{
this.executorService.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if ( ! this.executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
System.err.println( "Pool did not terminate." );
}
}
catch ( InterruptedException ex )
{
// (Re-)Cancel if current thread also interrupted
this.executorService.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}
Usage shown next. Beware: Multithreaded calls to System.out.println
do not always appear on the console chronologically. Always include, and inspect, timestamps to verify the order.
package singletask;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Future;
public class App
{
public static void main ( String[] args )
{
App app = new App();
app.demo();
}
private void demo ( )
{
Runnable task = ( ) -> {
UUID taskID = UUID.randomUUID();
System.out.println( "Starting task " + taskID + " at " + Instant.now() );
// Pretend to do some long hard work, by sleeping.
try { Thread.sleep( Duration.ofSeconds( 5 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
System.out.println( "Ending task " + taskID + " at " + Instant.now() );
};
TaskMaster taskMaster = new TaskMaster();
Optional < Future > f1 = taskMaster.tender( task ); // We expect acceptance, showing `Optional[java.util.concurrent.FutureTask@…`.
try { Thread.sleep( Duration.ofMillis( 500 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
System.out.println( "f1 = " + f1 );
Optional < Future > f2 = taskMaster.tender( task ); // We expect rejection, showing `Optional.empty`.
System.out.println( "f2 = " + f2 );
try { Thread.sleep( Duration.ofSeconds( 7 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
Optional < Future > f3 = taskMaster.tender( task ); // We expect acceptance, showing `Optional[java.util.concurrent.FutureTask@…`.
System.out.println( "f3 = " + f3 );
// Attempt a graceful shutwdown.
taskMaster.shutdownAndAwaitTerminationOfExecutorService();
System.out.println( "Demo ending at " + Instant.now() );
}
}
When run.
Starting task cc48efaa-390b-414d-9f3a-539e2be249b9 at 2022-02-03T06:42:58.516852Z
f1 = Optional[java.util.concurrent.FutureTask@1fb3ebeb[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@31befd9f[Wrapped task = singletask.App$$Lambda$14/0x0000000800c01208@1c20c684]]]
f2 = Optional.empty
Ending task cc48efaa-390b-414d-9f3a-539e2be249b9 at 2022-02-03T06:43:03.530138Z
Starting task a3de548c-b068-435c-a6cb-856d2f539042 at 2022-02-03T06:43:06.011485Z
f3 = Optional[java.util.concurrent.FutureTask@816f27d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@1218025c[Wrapped task = singletask.App$$Lambda$14/0x0000000800c01208@1c20c684]]]
Ending task a3de548c-b068-435c-a6cb-856d2f539042 at 2022-02-03T06:43:11.013576Z
Demo ending at 2022-02-03T06:43:11.014180Z
ExecutorService
While that TaskMaster
code above works, and offers the Optional
objects you asked for, I would recommend another approach.
👉 You can make your own version of an ExecutorService
. Your implementation could do something similar to what we saw above, tracking a single task’s execution.
Rather than returning an Optional< Future >
, the more orthodox approach would be to provide a submit
method implementation that either:
Future
if the tendered task can be immediately executed, or …RejectedExecutionException
because a task is already running.This behavior is defined in the Javadoc of ExecutorService
. Any methods of your which tender tasks to this custom executor service would trap for this exception rather than examine an Optional
.
In other words, to modify an excerpt from your Comment:
If two users simultaneously try to request data processing, only one of them will succeed and receive a
Future
, and another will see an exception thrown, indicating that the request was rejected.
With this custom executor service, the calling programmer has less to learn. The calling programmer would not need to understand the semantics of the TaskMaster
class, they need to understand only the common ExecutorService
behavior.
Tip: The AbstractExecutorService
class might make creating your own executor service easier.