I've faced an issue in my applicaiton and artificial example will look like this:
fun main(args: Array<String>) {
val start = System.currentTimeMillis()
Internal().doWork()
println("Duration is ${(System.currentTimeMillis() - start)/1000} sec")
}
class Internal {
fun doWork() {
val pool = ThreadPoolExecutor(
3, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
ArrayBlockingQueue(1000),
)
val future = CompletableFuture.supplyAsync(
{
// 1 subtask
val future1 = CompletableFuture.supplyAsync(
{
(1..10).map {
CompletableFuture.supplyAsync(SingleExternalCall(), pool)
}.sumOf { it.join() }
},
pool,
)
// 2 subtask
val future2 = CompletableFuture.supplyAsync(
{
(1..5).map {
CompletableFuture.supplyAsync(SingleExternalCall(), pool)
}.sumOf { it.join() }
},
pool,
)
// aggregate
future1.join() + future2.join()
},
pool,
)
println(future.join())
}
class SingleExternalCall : Supplier<Int> {
override fun get(): Int {
Thread.sleep(5000)
return counter.incrementAndGet().toInt()
}
}
companion object {
private val counter = AtomicLong()
}
}
If you try to run it - application will hang. The root cause is now clear for me. Initially 3 threads will be created
.sumOf { it.join() }
sumOf { it.join() }
future1.join() + future2.join()
All other tasks will be queued but because task queue is quite long(much longer than amount of taks in my examples) so new threads won't be created. So applicaiton has no resources to make useful work.
If we provide at least 4 threads as a core pool size - all work will be done
Let's think about solutions:
Executors.newCachedThreadPool()
. It will create a new thread every time we need an additional thread(if there is no free thread inside)Executors.newWorkStealingPool()
Conceptually looks like ForkJoinPool
(which is used inside newWorkStealingPool
) is a best one because we have dependency between tasks.
But I've reas some articles about ForkJoinPool and see that there some specific API with explicit fork
/join
and looks like it will require to rewrite the whole application(I would like to avoid it if it is possible)
So I want to know:
ForkJoinPool
/Executors.newWorkStealingPool()
in my example ?If we provide at least 4 threads as a core pool size - all work will be done
Yes, in that example, but with just four threads you've gone to a lot of work and mess to get zero effective concurrency, because only one is available for doing any work other than waiting on other tasks. It would be cleaner and no less performant to just do the work serially. You would want more than four.
- We can give more threads. But how many ? In one moment we will be limiited by amount of thread
The maximum number that can make progress at any given time is the number of execution units in your machine. Since you expect three to be blocked most of the time, the number of execution units plus three seems like a reasonable upper bound. You might or might not want to back off from that a bit to reduce contention with the other things your computer is doing.
- We can try to use Executors.newCachedThreadPool(). It will create a new thread every time we need an additional thread(if there is no free thread inside)
Yes, but because your particular example generates all the tasks it wants to perform right up front, a cached thread pool will tend to generate a lot of threads to support them, possibly as many as there are tasks. This is among the things that a thread pool is intended to avoid.
- We can try to use
Executors.newWorkStealingPool()
Yes, and this has the advantage that, if used properly, the join()
s will not prevent the threads executing them from performing other available work. This is a reasonable approach in support of tasks that create other tasks and must await their completion. Indeed, that's among its core use cases. Note, however, that this does not free you from considering what degree of concurrency you want to achieve. a ForkJoinThreadPool
such as your newWorkStealingPool()
will provide can choose a plausible default for you, but only you can determing how appropriate that default is, or whether you prefer something different.
there some specific API with explicit
fork
/join
and looks like it will require to rewrite the whole application
Probably so. To make use of work stealing, you need to submit your tasks as ForkJoinTask
s (though probably more specifically as instances of one of that class's subclasses), and you will want those tasks to fork()
their subtasks instead of submitting them directly to the pool.
If it's going to be disruptive to implement that then that's a good reason to hesitate. It might still be, however, that that's a change worth making.
- Is it a good idea to use
ForkJoinPool
/Executors.newWorkStealingPool()
in my example ?
Your particular example is structured in a way that is well suited to a ForkJoinPool
, and using such a pool appropriately should solve your blocking and parallelism problems. It's a good idea in that sense.
But the structure of your example is also simple enough and clear enough that you could reasonably solve those same problems by choosing a larger number of threads for a fixed thread pool. Were I writing that example from scratch, I would probably go with the ForkJoinPool
, as it seems better matched, but it's not clear whether the same analysis would apply to your real application. Nor whether it would be worth the effort to modify your existing real application to make effective use of a ForkJoinPool
.
- If yes - what API is the best one ?
We generally don't answer "best" questions here, as it tends both to be a matter of opinion and to be situational. Hopefully my remarks above give you a better basis to judge for yourself.