javathreadpoolthreadpoolexecutor

Java custom ThreadPool - pause task submission and cancel current queued tasks


I have the following scenario: a custom Threadpool needs to be able to be put on pause. When it is put on pause delete current queued tasks. During the time it is paused it can accept tasks and when resumed to execute the 'new' queued tasks. My current implementation is doing the above requirements. Threadpool implementation:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class QueriesThreadPoolExecutor extends ThreadPoolExecutor {
    private final List<Future<?>> futuresList = Collections.synchronizedList(new ArrayList<>());
    private boolean isPaused = false;
    private Lock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public QueriesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
//        System.out.println("beforeExecute " + t.getName());
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            t.interrupt();
        } finally {
            pauseLock.unlock();
        }
    }

    @Override
    public void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
//        System.out.println("afterExecute " + Thread.currentThread().getName());
    }

    @Override
    public void terminated() {
        super.terminated();
        System.out.println("Thread pool terminated");
    }

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks = super.shutdownNow();

        System.out.println("Shutting down thread pool, active tasks remaining: " + tasks.size());
        return tasks;
    }

    public <T> Future<T> submit(Callable<T> task) {
        System.out.println("submit task");
        Future<T> res = super.submit(task);
        futuresList.add(res);

        return res;
    }

    public void pause() {
        cancelRunningTasks();

        System.out.println("pause");
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        System.out.println("resume");
        pauseLock.lock();
        try {
            isPaused = false;
            unpaused.signal();
        } finally {
            pauseLock.unlock();
        }
    }

    public void cancelRunningTasks() {
        synchronized (futuresList) {
            for (Future<?> future : futuresList) {
                if (!future.isDone()) {
                    future.cancel(true);
                    System.out.println("cancelRunningTasks");
                }
            }
        }
    }
}

Unit test

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

class QueriesThreadPoolExecutorTests {
    private static QueriesThreadPoolExecutor queriesThreadPoolExecutor;
    private AtomicInteger longCounter = new AtomicInteger(0);
    private AtomicInteger shortCounter = new AtomicInteger(0);

    @BeforeAll
    static void setup() {
        queriesThreadPoolExecutor = new QueriesThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    }

    @AfterAll
    static void tearDown() {
        queriesThreadPoolExecutor.shutdownNow();
    }

    @Test
    void testCancelResumeTasks() {
        Callable<String> longRunningTask = () -> {
            Thread.sleep(20_000);
            return "long running task" + longCounter.incrementAndGet();
        };

        Callable<String> shortRunningTask = () -> {
            Thread.sleep(1_000);
            return "short running task" + shortCounter.incrementAndGet();
        };
        List<Future<String>> futureList = new ArrayList<>();

        submitTasks(futureList, shortRunningTask, longRunningTask);

        ScheduledExecutorService cancelScheduledExecutor = Executors.newScheduledThreadPool(2);
        cancelScheduledExecutor.schedule(() -> {
            queriesThreadPoolExecutor.pause();
        }, 2, TimeUnit.SECONDS);

        printResults(futureList);

        submitTasks(futureList, shortRunningTask, longRunningTask);

        queriesThreadPoolExecutor.resume();

        printResults(futureList);
    }

    private void submitTasks(List<Future<String>> futureList, Callable<String> shortRunningTask, Callable<String> longRunningTask) {
        for (int i = 0; i < 5; i++) {
            futureList.add(queriesThreadPoolExecutor.submit(shortRunningTask));
        }
        for (int i = 0; i < 5; i++) {
            futureList.add(queriesThreadPoolExecutor.submit(longRunningTask));
        }
    }

    private void printResults(List<Future<String>> futureList) {
        System.out.println("***********Printing results*****************");
        for (Future<String> future : futureList) {
            try {
                String status = future.isCancelled() ? "cancelled" : "done";
                String result = future.isCancelled() ? "cancelled" : future.get();
                System.out.println("task status - " + status + " - result " + result);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            } catch (CancellationException e) {
                System.out.println("Exception - Task was cancelled");
            }
        }
    }
}

for some reason when the Threadpool is put on pause, one or more tasks are raising an Exception, from unit test:

task status - done - result short running task1
cancelRunningTasks
cancelRunningTasks
cancelRunningTasks
cancelRunningTasks
cancelRunningTasks
Exception - Task was cancelled  <--------------------WHY?
task status - cancelled - result cancelled
task status - cancelled - result cancelled
task status - cancelled - result cancelled
task status - cancelled - result cancelled
submit task
pause
submit task

Maybe it is something trivial which I'm missing.


Solution

  • So what is happening here:

    You have a pool of 5 threads and start 5 tasks in parallel.

    Short tasks take 1s and long tasks 20s. You also schedule cancelation in 2s.

    There are two interesting bits here, one is that printResults loops one task at a time and the other is that future.get() is blocking.

    Your print loop will get to that statement before any of the long tasks are canceled and start blocking for the first long running task.

    Meanwhile the cancelScheduledExecutor will kick in and cancel all of the long running tasks.

    The task that was currently blocking is now canceled and the exception will be thrown.

    The rest of the tasks will properly be checked on .isCancelled() and log out.