javamultithreadingjava.util.concurrent

ExecutorService that dynamically scale the number of threads


I have a list of work-unit and I want to process them in parallel. Unit work is 8-15 seconds each, fully computational time, no I/O blocking. What I want to achieve is to have an ExecutorService that:

Something like:

Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
    service.submit(() -> {
        .. do some work ..
        queue.offer(result);
    );
}
while(queue.peek() != null) {
    ... process results while they arrive ...
}

What I tried with no success is:

I'm sure there is official implementation to handle this very basic use-case of concurrency. Can someone advice?


Solution

  • Create the ThreadPoolExecutor using a LinkedBlockingQueue and 20 as corePoolSize (first argument in the constructor):

    new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());


    If you use the LinkedBlockingQueue without a predefined capacity, the Pool:

    In your case, only one thread will be executed. And you're lucky to get one, since you set it to 0 and previous versions of Java (<1.6) wouldn't create any if the corePoolSize was set to 0 (how dare they?).

    Further versions do create a new thread even if the corePoolSize is 0, which seems like ... a fix that is ... a bug that... changes ... a logical behaviour?.

    Thread Pool Executor

    Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.)


    About scaling down

    In order to achieve removing all threads if there's no work to do, you will have to close the coreThreads specifically (they don't terminate by default). To achieve this, set allowCoreThreadTimeOut(true) before starting the Pool.

    Be aware of setting a correct keep-alive timeout: for example, if a new task is received on average at every 6 seconds, setting the keep-alive time to 5 seconds could lead to unnecessary erase+create operations(oh dear thread, you just had to wait one second!). Set this timeout based on the task reception income speed.

    allowCoreThreadTimeOut

    Sets the policy governing whether core threads may time out and terminate if no tasks arrive within the keep-alive time, being replaced if needed when new tasks arrive. When false, core threads are never terminated due to lack of incoming tasks. When true, the same keep-alive policy applying to non-core threads applies also to core threads. To avoid continual thread replacement, the keep-alive time must be greater than zero when setting true. This method should in general be called before the pool is actively used.


    TL/DR

    This fresh mix will lead to an ExecutorService that 99,99999% percent of the time won't block the submitter (for this to happen, the number of tasks queued should be 2.147.483.647), and that efficiently scales the number of threads in base of the work load, fluctuating (in both directions) between { 0 <--> corePoolSize } concurrent threads.

    As a suggestion, the queue's size should be monitorized, as the non-blocking behaviour has a price: the probability of getting OOM exceptions if it keeps growing without control, until INTEGER.MAX_VALUE is met (f.e: if the threads are deadlocked for an entire day while the submitters keep inserting tasks). Even if the task's size in memory could be small, 2.147.483.647 objects with its corresponding link wrappers, etc... is a lot of extra load.