I have a list of work-unit and I want to process them in parallel. Unit work is 8-15 seconds each, fully computational time, no I/O blocking. What I want to achieve is to have an ExecutorService
that:
Something like:
Queue<WorkResult> queue = new ConcurrentLinkedDeque<>();
ExecutorService service = ....
for(WorkUnit unit : list) {
service.submit(() -> {
.. do some work ..
queue.offer(result);
);
}
while(queue.peek() != null) {
... process results while they arrive ...
}
What I tried with no success is:
newCachedThreadPool()
creates too many threadsnew ThreadPoolExecutor(0, 20, 60L, SECONDS, new SynchronousQueue<>())
, but then I noticed that submit() is blocking due to the synchronous queuenew LinkedBlockingQueue()
, just to find out that the ThreadPoolExecutor spawns only one threadI'm sure there is official implementation to handle this very basic use-case of concurrency. Can someone advice?
Create the ThreadPoolExecutor
using a LinkedBlockingQueue
and 20
as corePoolSize
(first argument in the constructor):
new ThreadPoolExecutor(20, 20, 60L, SECONDS, new LinkedBlockingQueue<>());
If you use the LinkedBlockingQueue
without a predefined capacity, the Pool
:
maxPoolSize
.corePoolSize
's specified number.In your case, only one thread will be executed. And you're lucky to get one, since you set it to 0
and previous versions of Java (<1.6) wouldn't create any if the corePoolSize
was set to 0
(how dare they?).
Further versions do create a new thread even if the corePoolSize
is 0
, which seems like ... a fix that is ... a bug that... changes ... a logical behaviour?.
Using an unbounded queue (for example a
LinkedBlockingQueue
without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.)
About scaling down
In order to achieve removing all threads if there's no work to do, you will have to close the coreThreads
specifically (they don't terminate by default). To achieve this, set allowCoreThreadTimeOut(true)
before starting the Pool
.
Be aware of setting a correct keep-alive
timeout: for example, if a new task is received on average at every 6 seconds, setting the keep-alive time to 5 seconds could lead to unnecessary erase+create operations(oh dear thread, you just had to wait one second!). Set this timeout based on the task reception income speed.
Sets the policy governing whether core threads may time out and terminate if no tasks arrive within the keep-alive time, being replaced if needed when new tasks arrive. When false, core threads are never terminated due to lack of incoming tasks. When true, the same keep-alive policy applying to non-core threads applies also to core threads. To avoid continual thread replacement, the keep-alive time must be greater than zero when setting true. This method should in general be called before the pool is actively used.
TL/DR
LinkedBloquingQueue
as task queue.corePoolSize
replacing maxPoolSize
's meaning.allowCoreThreadTimeOut(true)
in order to allow the Pool
to scale down using a timeout based mechanism that also affects coreThreads
.keep-alive
value set to something logical based on the task reception latency.This fresh mix will lead to an ExecutorService
that 99,99999% percent of the time won't block the submitter (for this to happen, the number of tasks queued should be 2.147.483.647), and that efficiently scales the number of threads in base of the work load, fluctuating (in both directions) between { 0 <--> corePoolSize }
concurrent threads.
As a suggestion, the queue's size should be monitorized, as the non-blocking behaviour has a price: the probability of getting OOM
exceptions if it keeps growing without control, until INTEGER.MAX_VALUE
is met (f.e: if the threads are deadlocked for an entire day while the submitters keep inserting tasks). Even if the task's size in memory could be small, 2.147.483.647 objects with its corresponding link wrappers, etc... is a lot of extra load.