So, it turns out I was wrong.
When you configure a ForkJoinPool
with parallelism
set to 1, only one thread executes the tasks. The main
thread is blocked on ForkJoin.get()
. It doesn't actually execute any tasks.
That said, it turns out that it is really tricky providing deterministic behavior. Here are some of the problems I had to correct:
ForkJoinPool
was executing tasks using different worker threads (with different names) if the worker thread became idle long enough. For example, if the main thread got suspended on a debugging breakpoint, the worker thread would become idle and shut down. When I would resume execution, ForkJoinThread
would spin up a new worker thread with a different name. To solve this, I had to provide a custom ForkJoinWorkerThreadFactory
implementation that ensures only one thread runs at a time, and that its name is hard-coded. I also had ensure that my code was returning the same Random
instance even if a worker thread shut down and came back again.
- Collections with non-deterministic iteration order such as
HashMap
or HashSet
led to elements grabbing random numbers in a different order on every run. I corrected this by using LinkedHashMap
and LinkedHashSet
.
- Objects with non-deterministic hashCode() implementations, such as
Enum.hashCode()
. I forget what problems this caused but I corrected it by calculating the hashCode() myself instead of relying on the built-in method.
Here is a sample implementation of ForkJoinWorkerThreadFactory:
class MyForkJoinWorkerThread extends ForkJoinWorkerThread
{
MyForkJoinWorkerThread(ForkJoinPool pool)
{
super(pool);
// Change thread name after ForkJoinPool.registerWorker() does the same
setName("DETERMINISTIC_WORKER");
}
}
ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory()
{
private WeakReference<Thread> currentWorker = new WeakReference<>(null);
@Override
public synchronized ForkJoinWorkerThread newThread(ForkJoinPool pool)
{
// If the pool already has a live thread, wait for it to shut down.
Thread thread = currentWorker.get();
if (thread != null && thread.isAlive())
{
try
{
thread.join();
}
catch (InterruptedException e)
{
log.error("", e);
}
}
ForkJoinWorkerThread result = new MyForkJoinWorkerThread(pool);
currentWorker = new WeakReference<>(result);
return result;
}
};