javamultithreadingfork-joinforkjoinpool

How to configure a single-threaded ForkJoinPool?


Is it possible to configure ForkJoinPool to use 1 execution thread?

I am executing code that invokes Random inside a ForkJoinPool. Every time it runs, I end up with different runtime behavior, making it difficult to investigate regressions.

I would like the codebase to offer "debug" and "release" modes. "debug" mode would configure Random with a fixed seed, and ForkJoinPool with a single execution thread. "release" mode would use system-provided Random seeds and use the default number of ForkJoinPool threads.

I tried configuring ForkJoinPool with a parallelism of 1, but it uses 2 threads (main and a second worker thread). Any ideas?


Solution

  • So, it turns out I was wrong.

    When you configure a ForkJoinPool with parallelism set to 1, only one thread executes the tasks. The main thread is blocked on ForkJoin.get(). It doesn't actually execute any tasks.

    That said, it turns out that it is really tricky providing deterministic behavior. Here are some of the problems I had to correct:

    Here is a sample implementation of ForkJoinWorkerThreadFactory:

    class MyForkJoinWorkerThread extends ForkJoinWorkerThread
    {
        MyForkJoinWorkerThread(ForkJoinPool pool)
        {
            super(pool);
            // Change thread name after ForkJoinPool.registerWorker() does the same
            setName("DETERMINISTIC_WORKER");
        }
    }
    
    ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory()
    {
        private WeakReference<Thread> currentWorker = new WeakReference<>(null);
    
        @Override
        public synchronized ForkJoinWorkerThread newThread(ForkJoinPool pool)
        {
            // If the pool already has a live thread, wait for it to shut down.
            Thread thread = currentWorker.get();
            if (thread != null && thread.isAlive())
            {
                try
                {
                    thread.join();
                }
                catch (InterruptedException e)
                {
                    log.error("", e);
                }
            }
            ForkJoinWorkerThread result = new MyForkJoinWorkerThread(pool);
            currentWorker = new WeakReference<>(result);
            return result;
        }
    };