javaasynchronousdouble-checked-locking

Asynchronous single task executor


I'm in doubt if the solution below is correct for the following:

Below is the code used to solve this:

    public class Processor {
        private final ExecutorService execService = Executors.newSingleThreadExecutor();
        private final Object monitor = new Object();
        private AtomicReference<Task> runningTask = new AtomicReference<>(null);
        
        public Optional<CompletableFuture<String>> processDataAsync(String data) {
            if (runningTask.get() != null)
                return Optional.empty();  //rejecting data process request because another request is already being served
            
            synchronized (monitor) {
                if (runningTask.get() != null)
                    return Optional.empty();
                
                CompletableFuture<String> f = new CompletableFuture<>();
                f.whenComplete((r, e) -> runningTask.set(null));  //when processing completes, another data process request can be accepted
                
                Task task = new Task(f, data);
                runningTask.set(task);
                execService.submit(task);
                return Optional.of(f);
            }
        }
    }   

Task is Runnable as below:

    public class Task implements Runnable {
        private final CompletableFuture<String> result;
        private final String data;
        
        public Task(CompletableFuture<String> result, String data) {
            this.result = result;
            this.data = data;
        }
        
        @Override
        public void run() {
            String processingResult = processData(data);  //does some blocking stuff with data, returning result of processing
            result.complete(processingResult);
        }
    }

What confuses me here is the synchronization (i.e. blocking) in processDataAsync. I understand that blocking here is very short and not critical, but shouldn't asynchronous method be always implemented without blocking? If so, I can't imagine how "single processing" can be achieved without synchronization.


Solution

  • Perhaps I’ve misunderstood the problem, but it seems you are over complicating the situation. Rather than keeping track of the task, keep track of the Future returned by ExecutorService#submit. A Future object is your tether leading back to the task being executed.

    Define a member field for the Future.

    Future future ;
    

    Test the Future when request to process is made. Call Future#isDone to test. Javadoc says:

    Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

    if( Objects.isNull( this.future ) || this.future.isDone() ) {
        … proceed with request to process data.
        this.future = executorService.submit( … ) ;
        return Optional.of( this.future ) ;
    } else {
        … Refuse the request to process data.
        … Do *not* submit any task to the executor service. 
        return Optional.empty() ;
    }
    

    TaskMaster solution

    In various comments, you presented more details of your problem.

    You want to submit tasks from various threads to a single object. Let's call that object TaskMaster for clarity. That TaskMaster instance tracks whether its nested executor service is currently working on a task or not.

    Since the code shown above discussed here will be accessed across threads, we must protect the Future future ; in a thread-safe manner. One easy way to do that is to mark synchronized on the one and only method for tendering a task to the TaskMaster.

    package singletask;
    
    import java.util.Objects;
    import java.util.Optional;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    // Runs one task at a time, rejecting any task tendered while already executing a task.
    public class TaskMaster
    {
        // Member fields.
        private final ExecutorService executorService;
        private Future future;
    
        // Constructor
        public TaskMaster ( )
        {
            this.executorService = Executors.newSingleThreadExecutor();
        }
    
        public synchronized Optional < Future > tender ( Runnable task )
        {
            if ( Objects.isNull( this.future ) || this.future.isDone() )
            {
                // Proceed with immediate execution of the tendered task.
                this.future = executorService.submit( task );
                return Optional.of( this.future );
            } else
            {
                // Already busy on a task. Reject this tendered task.
                return Optional.empty();
            }
        }
    
        public void shutdownAndAwaitTerminationOfExecutorService ( )
        {
            if ( Objects.isNull( this.executorService ) ) { return; }
            this.executorService.shutdown(); // Stop new tasks from being submitted.
            try
            {
                // Wait a while for existing tasks to terminate
                if ( ! this.executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
                {
                    this.executorService.shutdownNow(); // Cancel currently executing tasks
                    // Wait a while for tasks to respond to being cancelled
                    if ( ! this.executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
                        System.err.println( "Pool did not terminate." );
                }
            }
            catch ( InterruptedException ex )
            {
                // (Re-)Cancel if current thread also interrupted
                this.executorService.shutdownNow();
                // Preserve interrupt status
                Thread.currentThread().interrupt();
            }
        }
    }
    

    Usage shown next. Beware: Multithreaded calls to System.out.println do not always appear on the console chronologically. Always include, and inspect, timestamps to verify the order.

    package singletask;
    
    import java.time.Duration;
    import java.time.Instant;
    import java.util.Optional;
    import java.util.UUID;
    import java.util.concurrent.Future;
    
    public class App
    {
        public static void main ( String[] args )
        {
            App app = new App();
            app.demo();
        }
    
        private void demo ( )
        {
            Runnable task = ( ) -> {
                UUID taskID = UUID.randomUUID();
                System.out.println( "Starting task " + taskID + " at " + Instant.now() );
                // Pretend to do some long hard work, by sleeping.
                try { Thread.sleep( Duration.ofSeconds( 5 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
                System.out.println( "Ending task " + taskID + " at " + Instant.now() );
            };
    
            TaskMaster taskMaster = new TaskMaster();
    
            Optional < Future > f1 = taskMaster.tender( task ); // We expect acceptance, showing `Optional[java.util.concurrent.FutureTask@…`.
            try { Thread.sleep( Duration.ofMillis( 500 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
            System.out.println( "f1 = " + f1 );
    
            Optional < Future > f2 = taskMaster.tender( task ); // We expect rejection, showing `Optional.empty`.
            System.out.println( "f2 = " + f2 );
    
            try { Thread.sleep( Duration.ofSeconds( 7 ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
            Optional < Future > f3 = taskMaster.tender( task ); // We expect acceptance, showing `Optional[java.util.concurrent.FutureTask@…`.
            System.out.println( "f3 = " + f3 );
    
            // Attempt a graceful shutwdown.
            taskMaster.shutdownAndAwaitTerminationOfExecutorService();
            System.out.println( "Demo ending at " + Instant.now() );
        }
    }
    

    When run.

    Starting task cc48efaa-390b-414d-9f3a-539e2be249b9 at 2022-02-03T06:42:58.516852Z
    f1 = Optional[java.util.concurrent.FutureTask@1fb3ebeb[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@31befd9f[Wrapped task = singletask.App$$Lambda$14/0x0000000800c01208@1c20c684]]]
    f2 = Optional.empty
    Ending task cc48efaa-390b-414d-9f3a-539e2be249b9 at 2022-02-03T06:43:03.530138Z
    Starting task a3de548c-b068-435c-a6cb-856d2f539042 at 2022-02-03T06:43:06.011485Z
    f3 = Optional[java.util.concurrent.FutureTask@816f27d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@1218025c[Wrapped task = singletask.App$$Lambda$14/0x0000000800c01208@1c20c684]]]
    Ending task a3de548c-b068-435c-a6cb-856d2f539042 at 2022-02-03T06:43:11.013576Z
    Demo ending at 2022-02-03T06:43:11.014180Z
    

    Custom ExecutorService

    While that TaskMaster code above works, and offers the Optional objects you asked for, I would recommend another approach.

    👉 You can make your own version of an ExecutorService. Your implementation could do something similar to what we saw above, tracking a single task’s execution.

    Rather than returning an Optional< Future >, the more orthodox approach would be to provide a submit method implementation that either:

    This behavior is defined in the Javadoc of ExecutorService. Any methods of your which tender tasks to this custom executor service would trap for this exception rather than examine an Optional.

    In other words, to modify an excerpt from your Comment:

    If two users simultaneously try to request data processing, only one of them will succeed and receive a Future, and another will see an exception thrown, indicating that the request was rejected.

    With this custom executor service, the calling programmer has less to learn. The calling programmer would not need to understand the semantics of the TaskMaster class, they need to understand only the common ExecutorService behavior.

    Tip: The AbstractExecutorService class might make creating your own executor service easier.