javamultithreadingkotlinforkjoinpoolwork-stealing

How to use `ForkJoinPool/`Executors.newWorkStealingPool()` to achieve better performance?


I've faced an issue in my applicaiton and artificial example will look like this:

fun main(args: Array<String>) {
    val start = System.currentTimeMillis()
    Internal().doWork()
    println("Duration is ${(System.currentTimeMillis() - start)/1000} sec")
}

class Internal {

    fun doWork() {

        val pool = ThreadPoolExecutor(
        3, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        ArrayBlockingQueue(1000),
    )

        val future = CompletableFuture.supplyAsync(
            {
                // 1 subtask
                val future1 = CompletableFuture.supplyAsync(
                    {
                        (1..10).map {
                            CompletableFuture.supplyAsync(SingleExternalCall(), pool)
                        }.sumOf { it.join() }
                    },
                    pool,
                )
                // 2 subtask
                val future2 = CompletableFuture.supplyAsync(
                    {
                        (1..5).map {
                            CompletableFuture.supplyAsync(SingleExternalCall(), pool)
                        }.sumOf { it.join() }
                    },
                    pool,
                )
                // aggregate
                future1.join() + future2.join()
            },
            pool,
        )
        println(future.join())
    }

    class SingleExternalCall : Supplier<Int> {

        override fun get(): Int {
            Thread.sleep(5000)
            return counter.incrementAndGet().toInt()
        }
    }

    companion object {

        private val counter = AtomicLong()
    }
}

If you try to run it - application will hang. The root cause is now clear for me. Initially 3 threads will be created

All other tasks will be queued but because task queue is quite long(much longer than amount of taks in my examples) so new threads won't be created. So applicaiton has no resources to make useful work.

If we provide at least 4 threads as a core pool size - all work will be done

Let's think about solutions:

  1. We can give more threads. But how many ? In one moment we will be limiited by amount of thread
  2. We can try to use Executors.newCachedThreadPool(). It will create a new thread every time we need an additional thread(if there is no free thread inside)
  3. We can try to use Executors.newWorkStealingPool()

Conceptually looks like ForkJoinPool(which is used inside newWorkStealingPool ) is a best one because we have dependency between tasks. But I've reas some articles about ForkJoinPool and see that there some specific API with explicit fork/join and looks like it will require to rewrite the whole application(I would like to avoid it if it is possible)

So I want to know:

  1. Is it a good idea to use ForkJoinPool/Executors.newWorkStealingPool() in my example ?
  2. If yes - what API is the best one ?

Solution

  • If we provide at least 4 threads as a core pool size - all work will be done

    Yes, in that example, but with just four threads you've gone to a lot of work and mess to get zero effective concurrency, because only one is available for doing any work other than waiting on other tasks. It would be cleaner and no less performant to just do the work serially. You would want more than four.

    1. We can give more threads. But how many ? In one moment we will be limiited by amount of thread

    The maximum number that can make progress at any given time is the number of execution units in your machine. Since you expect three to be blocked most of the time, the number of execution units plus three seems like a reasonable upper bound. You might or might not want to back off from that a bit to reduce contention with the other things your computer is doing.

    1. We can try to use Executors.newCachedThreadPool(). It will create a new thread every time we need an additional thread(if there is no free thread inside)

    Yes, but because your particular example generates all the tasks it wants to perform right up front, a cached thread pool will tend to generate a lot of threads to support them, possibly as many as there are tasks. This is among the things that a thread pool is intended to avoid.

    1. We can try to use Executors.newWorkStealingPool()

    Yes, and this has the advantage that, if used properly, the join()s will not prevent the threads executing them from performing other available work. This is a reasonable approach in support of tasks that create other tasks and must await their completion. Indeed, that's among its core use cases. Note, however, that this does not free you from considering what degree of concurrency you want to achieve. a ForkJoinThreadPool such as your newWorkStealingPool() will provide can choose a plausible default for you, but only you can determing how appropriate that default is, or whether you prefer something different.

    there some specific API with explicit fork/join and looks like it will require to rewrite the whole application

    Probably so. To make use of work stealing, you need to submit your tasks as ForkJoinTasks (though probably more specifically as instances of one of that class's subclasses), and you will want those tasks to fork() their subtasks instead of submitting them directly to the pool.

    If it's going to be disruptive to implement that then that's a good reason to hesitate. It might still be, however, that that's a change worth making.

    1. Is it a good idea to use ForkJoinPool/Executors.newWorkStealingPool() in my example ?

    Your particular example is structured in a way that is well suited to a ForkJoinPool, and using such a pool appropriately should solve your blocking and parallelism problems. It's a good idea in that sense.

    But the structure of your example is also simple enough and clear enough that you could reasonably solve those same problems by choosing a larger number of threads for a fixed thread pool. Were I writing that example from scratch, I would probably go with the ForkJoinPool, as it seems better matched, but it's not clear whether the same analysis would apply to your real application. Nor whether it would be worth the effort to modify your existing real application to make effective use of a ForkJoinPool.

    1. If yes - what API is the best one ?

    We generally don't answer "best" questions here, as it tends both to be a matter of opinion and to be situational. Hopefully my remarks above give you a better basis to judge for yourself.