javamultithreadingexecutorservicerunnableinterrupted-exception

Creating a Self-Interrupting ExecutorService


I've got a task working over a directory of files which needs to throw an IOException if anything goes wrong. I also need it to go faster, so I'm splitting the work done into multiple threads and awaiting their termination. It looks something like this:

//Needs to throw IOException so the rest of the framework handles it properly.
public void process(File directory) throws IOException {
    ExecutorService executorService =
        new ThreadPoolExecutor(16, 16, Long.MAX_VALUE, TimeUnit.NANOSECONDS,
            new LinkedBlockingQueue<Runnable>());

    //Convenience class to walk over relevant file types.
    Source source = new SourceImpl(directory);
    while (source.hasNext()) {
        File file = source.next();
        executorService.execute(new Worker(file));
    }

    try {
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
    } catch (InterruptedException e) {
        executorService.shutdownNow();
        throw new IOException("Worker thread had a problem!");
    }
}

While the Worker thread is basically:

private class Worker implements Runnable {
    private final File file;
    public Worker(File file) { this.file = file; }

    @Override
    public void run() {
        try {
            //Do work
        } catch (IOException e) {
            Thread.currentThread().interrupt();
        }
    }
}

The desired behavior is that if any Worker has an IOException then the spawning thread is made aware of it and can in turn throw its own IOException. This was the best way I could think of to allow the Worker threads to signal an error, but I'm still not sure I set it up right.

So, first of all, will this do what I'm expecting? If a Worker thread has an error in run(), will calling Thread.currentThread().interrupt(); cause an InterruptedException to be thrown such that it's caught by the blocking executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);?

Secondly, what will happen if a running Worker calls its interrupt before all of the threads have been queued; before the blocking try/catch block?

Finally (and most importantly), is there any more elegant way to achieve my aim? I want to have all innumerable subthreads execute either until completion or until any one of them has an error, at which point I'd like to handle it in the spawning thread (by effectively failing the entire directory).


SOLUTION

Based on the answers, here's the implementation that I ended up using. It nicely handles my asynchronous desires and fails cleanly and relatively fast on IOExceptions.

public void process(File directory) throws IOException {
    //Set up a thread pool of 16 to do work.
    ExecutorService executorService = Executors.newFixedThreadPool(16);
    //Arbitrary file source.
    Source source = new SourceImpl(directory);
    //List to hold references to all worker threads.
    ArrayList<Callable<IOException>> filesToWork =
        new ArrayList<Callable<IOException>>();
    //Service to manage the running of the threads.
    ExecutorCompletionService<IOException> ecs =
        new ExecutorCompletionService<IOException>(executorService);

    //Queue up all of the file worker threads.
    while (source.hasNext())
        filesToWork.add(new Worker(file));

    //Store the potential results of each worker thread.
    int n = filesToWork.size();
    ArrayList<Future<IOException>> futures =
        new ArrayList<Future<IOException>>(n);

    //Prepare to return an arbitrary worker's exception.
    IOException exception = null;
    try {
        //Add all workers to the ECS and Future collection.
        for (Callable<IOException> callable : filesToWork)
            futures.add(ecs.submit(callable));
        for (int i = 0; i < n; i++) {
            try {
                //Get each result as it's available, sometimes blocking.
                IOException e = ecs.take().get();
                //Stop if an exception is returned.
                if (e != null) {
                    exception = e;
                    break;
                }
            //Also catch our own exceptions.
            } catch (InterruptedException e) {
                exception = new IOException(e);
                break;
            } catch (ExecutionException e) {
                exception = new IOException(e);
                break;
            }
        }
    } finally {
        //Stop any pending tasks if we broke early.
        for (Future<IOException> f : futures)
            f.cancel(true);
        //And kill all of the threads.
        executorService.shutdownNow();
    }

    //If anything went wrong, it was preserved. Throw it now.
    if (exception != null)
        throw exception;
}

And

//Does work, and returns (not throws) an IOException object on error.
private class Worker implements Callable<IOException> {
    private final File file;
    public Worker(File file) { this.file = file; }

    @Override
    public IOException call() {
        try {
            //Do work
        } catch (IOException e) {
            return e;
        }
        return null;
    }
}

Solution

  • Calling interrupt() like that will not affect the main thread.

    what you should do instead is make your worker a Callable instead of a Runnable and allow the failure exception to leave the call() method. Then, execute all your Workers using an ExecutorCompletionService. that will allow you to determine the status of each task and take action in the main thread if one of the tasks fails.