javacallbacknotifyexecutor

Java executors: how to be notified, without blocking, when a task completes?


Say I have a queue full of tasks which I need to submit to an executor service. I want them processed one at a time. The simplest way I can think of is to:

  1. Take a task from the queue
  2. Submit it to the executor
  3. Call .get on the returned Future and block until a result is available
  4. Take another task from the queue...

However, I am trying to avoid blocking completely. If I have 10,000 such queues, which need their tasks processed one at a time, I'll run out of stack space because most of them will be holding on to blocked threads.

What I would like is to submit a task and provide a call-back which is called when the task is complete. I'll use that call-back notification as a flag to send the next task. (functionaljava and jetlang apparently use such non-blocking algorithms, but I can't understand their code)

How can I do that using JDK's java.util.concurrent, short of writing my own executor service?

(the queue which feeds me these tasks may itself block, but that is an issue to be tackled later)


Solution

  • Define a callback interface to receive whatever parameters you want to pass along in the completion notification. Then invoke it at the end of the task.

    You could even write a general wrapper for Runnable tasks, and submit these to ExecutorService. Or, see below for a mechanism built into Java 8.

    class CallbackTask implements Runnable {
    
      private final Runnable task;
    
      private final Callback callback;
    
      CallbackTask(Runnable task, Callback callback) {
        this.task = task;
        this.callback = callback;
      }
    
      public void run() {
        task.run();
        callback.complete();
      }
    
    }
    

    With CompletableFuture, Java 8 included a more elaborate means to compose pipelines where processes can be completed asynchronously and conditionally. Here's a contrived but complete example of notification.

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    
    public class GetTaskNotificationWithoutBlocking {
    
      public static void main(String... argv) throws Exception {
        ExampleService svc = new ExampleService();
        GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
        CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
        f.thenAccept(listener::notify);
        System.out.println("Exiting main()");
      }
    
      void notify(String msg) {
        System.out.println("Received message: " + msg);
      }
    
    }
    
    class ExampleService {
    
      String work() {
        sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
        char[] str = new char[5];
        ThreadLocalRandom current = ThreadLocalRandom.current();
        for (int idx = 0; idx < str.length; ++idx)
          str[idx] = (char) ('A' + current.nextInt(26));
        String msg = new String(str);
        System.out.println("Generated message: " + msg);
        return msg;
      }
    
      public static void sleep(long average, TimeUnit unit) {
        String name = Thread.currentThread().getName();
        long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
        System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
        try {
          unit.sleep(timeout);
          System.out.println(name + " awoke.");
        } catch (InterruptedException abort) {
          Thread.currentThread().interrupt();
          System.out.println(name + " interrupted.");
        }
      }
    
      public static long exponential(long avg) {
        return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
      }
    
    }