javaconcurrencyexecutorservicethreadpoolexecutor

Iterating a folder structure in parallel with Files.newDirectoryStream, waiting for ExecutorService with recursive task submission to complete


I am using Files.newDirectoryStream to iterate a directory structure recursively. I want to use an ExecutorService to walk the directory in parallel (files are over a slow network connection and latency is causing our original single threaded search approach to be blocked most of the time).

If the found Path is a directory, I submit a new task to an executor, recursively:

ExecutorService executorService = Executors.newFixedThreadPool(numThreads);

public void walkDirectory(Path directory) {
    try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
        for (Path entry : stream) {
            
            DosFileAttributes attribs =  getBasicFileAttributes(entry);
            if (attribs.isDirectory()) {
                executorService.submit(() -> walkDirectory(entry));
            } else {
                process(entry, attribs);
            }
        }
    }
}

I now need to block until the iteration is complete.

The naive approach to call executorService.shutdown() then awaitTermination() does not work because the iteration needs to add new jobs to the service as it continues to walk the tree. Also note that the threads of the executor service are needing to submit jobs to that same executor service.

CountDownLatch doesn't work because we have no idea how many tasks are going to be submitted in advance.

Does anyone have suggestions on how to wait for the traversal to finish without preventing new jobs from being added to the executor service?

Or is there a better/more elegant way to achieve this parallelization goal?

PS - I'm already thinking that I should be looking at configuring the executor service with setting a bounded queue with a CallerRunsPolicy rejection policy so I don't wind up with billions of queued tasks... Any best practices would be greatly appreciated.


Here's the latest strategy that we are testing:

I create an AtomicInteger counter that keeps track of the # of directories being walked. When that counter drops to zero, we know that the traversal is complete.

We increment the counter before submitting a new directory to the executor service, and we decrement as the task finishes.

This appears to be working as desired.

I'm using raw synchronized constructs for the notifications - not sure if there is a more elegant way to handle that.

If anyone has feedback, I'd appreciate hearing it.

public class FolderWalker {

    private final ExecutorService executorService;
    private final PathProcessor processor;
    private final PathFilter fileFilter;
    private final PathFilter directoryFilter;
    private final ExceptionHandler exceptionHandler;
    private final AtomicInteger activeThreads = new AtomicInteger(0);
    private final Object lock = new Object();
    
    private boolean keepRunning = true;

    public static interface PathProcessor{
        public void process(Path p, DosFileAttributes attribs) throws Exception;
    }
    
    public static interface ExceptionHandler{
        public boolean handle(Path p, Exception e); // return true to continue traversal
    }
    
    public static interface PathFilter{
        public boolean accept(Path p, DosFileAttributes attribs);
        
        public static PathFilter ACCEPT_ALL = (p, attribs) -> true;
    }

    public FolderWalker(int numThreads, PathFilter directoryFilter, PathFilter fileFilter, PathProcessor processor, ExceptionHandler exceptionHandler) {
        this.executorService = createExecutorService(numThreads);
        this.directoryFilter = directoryFilter;
        this.fileFilter = fileFilter;
        this.processor = processor;
        this.exceptionHandler = exceptionHandler;
    }
    
    private static ExecutorService createExecutorService(int numThreads) {
        
        // if threads are all in use, then the calling thread has to do the work - this will keep us from having excessive memory overhead of submitted jobs
        return new ThreadPoolExecutor(numThreads, numThreads, 0, TimeUnit.MILLISECONDS, 
                  new SynchronousQueue<>(), 
                  new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void walkDirectory(Path directory) {
        activeThreads.incrementAndGet();
        executorService.submit(() -> walkDirectoryInternal(directory));
    }
    
    private void walkDirectoryInternal(Path directory) {
        int activeThreadCount = activeThreads.get();
        System.out.println(Thread.currentThread() + " - Before: Looking in " + directory + " - Active folders: " + activeThreadCount);
        
        try {
            try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
                for (Path entry : stream) {
                    // TODO: How do we want to handle exceptions?
                    //if (thrownException != null) return;
                    if (Thread.interrupted()) return;
                    if (!keepRunning) return;
                    
                    try {
                        DosFileAttributes attribs =  getBasicFileAttributes(entry);
                        if (attribs.isDirectory() && directoryFilter.accept(entry, attribs)) {
                            activeThreads.incrementAndGet();
                            executorService.submit(() -> walkDirectoryInternal(entry));
                        } else if (fileFilter.accept(entry, attribs)) {
                            processor.process(entry, attribs);
                        }
                    } catch (Exception e) {
                        if (!exceptionHandler.handle(entry, e))
                            keepRunning = false;
                    }
                }
            } catch (Exception e) {
                if (!exceptionHandler.handle(directory, e))
                    keepRunning = false;
            }
        } finally {
            activeThreadCount = activeThreads.decrementAndGet();
            System.out.println(Thread.currentThread() + " - After: Looking in " + directory + " - Active folders: " + activeThreadCount);
            if (activeThreadCount == 0) {
                synchronized(lock) {
                    lock.notifyAll();
                }
            }
        }           
            
    }

    private DosFileAttributes getBasicFileAttributes(Path p) throws IOException {
        return Files.getFileAttributeView(p, DosFileAttributeView.class).readAttributes();
    }
    
    public void await() throws Exception {
        synchronized(lock) {
            while (activeThreads.get() != 0)
                lock.wait();
        }
        
        executorService.shutdown();
        System.out.println("Thread pool is shut down");
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }
        System.out.println("Thread pool is terminated");
        
    }

}

Solution

  • Future may help here.

    When you encounter a directory, submit a task and add the returned Future to a queue.
    After the recursive method (walkDirectory) returns, check that all tasks have completed by using a while loop and polling the queue.

    Important notes:

    ExecutorService executorService = Executors.newFixedThreadPool(10);
    Queue<Future<?>> queue = new ConcurrentLinkedQueue<>();
    
    public void walkWholeDir(Path directory) throws ExecutionException, InterruptedException {
        walkDirectory(directory);
        System.out.println("Waiting for all futures to complete...");
        while (true) {
            var future = queue.poll();
            if (future != null) {
                future.get();
            } else {
                break;
            }
        }
        System.out.println("All futures are completed");
    }
    
    private void walkDirectory(Path directory) {
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory)) {
            for (Path entry : stream) {
                BasicFileAttributes attribs =  getBasicFileAttributes(entry);
                if (attribs.isDirectory()) {
                    Future<?> future = executorService.submit(() -> walkDirectory(entry));
                    queue.add(future);
                } else {
                    process(entry, attribs);
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }