Is it possible to configure ForkJoinPool
to use 1 execution thread?
I am executing code that invokes Random
inside a ForkJoinPool
. Every time it runs, I end up with different runtime behavior, making it difficult to investigate regressions.
I would like the codebase to offer "debug" and "release" modes. "debug" mode would configure Random
with a fixed seed, and ForkJoinPool
with a single execution thread. "release" mode would use system-provided Random
seeds and use the default number of ForkJoinPool
threads.
I tried configuring ForkJoinPool
with a parallelism of 1, but it uses 2 threads (main
and a second worker thread). Any ideas?
So, it turns out I was wrong.
When you configure a ForkJoinPool
with parallelism
set to 1, only one thread executes the tasks. The main
thread is blocked on ForkJoin.get()
. It doesn't actually execute any tasks.
That said, it turns out that it is really tricky providing deterministic behavior. Here are some of the problems I had to correct:
ForkJoinPool
was executing tasks using different worker threads (with different names) if the worker thread became idle long enough. For example, if the main thread got suspended on a debugging breakpoint, the worker thread would become idle and shut down. When I would resume execution, ForkJoinThread
would spin up a new worker thread with a different name. To solve this, I had to provide a custom ForkJoinWorkerThreadFactory
implementation that ensures only one thread runs at a time, and that its name is hard-coded. I also had ensure that my code was returning the same Random
instance even if a worker thread shut down and came back again.HashMap
or HashSet
led to elements grabbing random numbers in a different order on every run. I corrected this by using LinkedHashMap
and LinkedHashSet
.Enum.hashCode()
. I forget what problems this caused but I corrected it by calculating the hashCode() myself instead of relying on the built-in method.Here is a sample implementation of ForkJoinWorkerThreadFactory:
class MyForkJoinWorkerThread extends ForkJoinWorkerThread
{
MyForkJoinWorkerThread(ForkJoinPool pool)
{
super(pool);
// Change thread name after ForkJoinPool.registerWorker() does the same
setName("DETERMINISTIC_WORKER");
}
}
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory()
{
private WeakReference<Thread> currentWorker = new WeakReference<>(null);
@Override
public synchronized ForkJoinWorkerThread newThread(ForkJoinPool pool)
{
// If the pool already has a live thread, wait for it to shut down.
Thread thread = currentWorker.get();
if (thread != null && thread.isAlive())
{
try
{
thread.join();
}
catch (InterruptedException e)
{
log.error("", e);
}
}
ForkJoinWorkerThread result = new MyForkJoinWorkerThread(pool);
currentWorker = new WeakReference<>(result);
return result;
}
};