So I am trying to understand how Java ForkJoin framework works. The simplest thing I could think of implementing was an array sum. However my parallel implementation is 3-4 times slower than the serial implementation. I must obviously be doing something wrong, but I am not sure what.
To measure the performance, I created a set of classes and interfaces (I used Lombok annotations for generating boilerplate code).
interface Result<T> {
T getValue();
}
@AllArgsConstructor(staticName = "of")
@Value
class MeasuredResult<T> implements Result<T> {
T value;
long elapsedTimeMillis;
}
@AllArgsConstructor(staticName = "of")
class CombinedResult<T> implements Result<T> {
private final MeasuredResult<T> parallelResult;
private final MeasuredResult<T> serialResult;
public double getParallelizationFactor() {
return (double) serialResult.getElapsedTimeMillis() / parallelResult.getElapsedTimeMillis();
}
public T getParallelValue() {
return parallelResult.getValue();
}
public T getSerialValue() {
return parallelResult.getValue();
}
@Override
public T getValue() {
return getSerialValue();
}
public boolean isDifferent() {
return !isSame();
}
public boolean isSame() {
return parallelResult.getValue().equals(serialResult.getValue());
}
}
interface Parallelizable<T> {
T processParallelly();
T processSerially();
default CombinedResult<T> getResult() {
MeasuredResult<T> parallelResult = measureParallel();
MeasuredResult<T> serialResult = measureSerial();
return CombinedResult.of(parallelResult, serialResult);
}
default MeasuredResult<T> measure(Supplier<T> supplier) {
long startTime = System.currentTimeMillis();
T value = supplier.get();
long endTime = System.currentTimeMillis();
return MeasuredResult.of(value, endTime - startTime);
}
default MeasuredResult<T> measureParallel() {
return measure(this::processParallelly);
}
default MeasuredResult<T> measureSerial() {
return measure(this::processSerially);
}
}
The idea was that by implementing the Parallelizable
interface, I'd define the serial and parallel versions of the code and use the getResult()
function to get a CombinedResult
object with the values and time measurement to unit test with. Here's my implementation of the array sum.
@AllArgsConstructor
public class ArraySum implements Parallelizable<Integer> {
private final int[] nums;
@Override
public Integer processParallelly() {
return new ParallelForkJoinImpl(0, nums.length).compute();
}
@Override
public Integer processSerially() {
int sum = 0;
for (int num : nums) {
sum += num;
}
return sum;
}
@AllArgsConstructor()
private class ParallelForkJoinImpl extends RecursiveTask<Integer> {
private static final int THRESHOLD = 1_000;
private final int start;
private final int end;
@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += nums[i];
}
return sum;
}
int mid = (start + end) / 2;
ForkJoinTask<Integer> left = new ParallelForkJoinImpl(start, mid).fork();
ForkJoinTask<Integer> right = new ParallelForkJoinImpl(mid, end).fork();
return left.join() + right.join();
}
}
}
From what I understand, calling fork()
on the RecursiveTask
implementation should give me a Future
object as response which will block on computation when the join()
function is called on it. Also use the common ForkJoinPool
will automatically be used when fork()
is called.
But like I said, the value for elapsedTimeMillis
for the parallel implementation is 3-4 times larger than the serial implementation, and I don't know why. What did I do wrong here?