0

So I am trying to understand how Java ForkJoin framework works. The simplest thing I could think of implementing was an array sum. However my parallel implementation is 3-4 times slower than the serial implementation. I must obviously be doing something wrong, but I am not sure what.

To measure the performance, I created a set of classes and interfaces (I used Lombok annotations for generating boilerplate code).

interface Result<T> {
    T getValue();
}

@AllArgsConstructor(staticName = "of")
@Value
class MeasuredResult<T> implements Result<T> {
    T value;
    long elapsedTimeMillis;
}

@AllArgsConstructor(staticName = "of")
class CombinedResult<T> implements Result<T> {
    private final MeasuredResult<T> parallelResult;
    private final MeasuredResult<T> serialResult;

    public double getParallelizationFactor() {
        return (double) serialResult.getElapsedTimeMillis() / parallelResult.getElapsedTimeMillis();
    }

    public T getParallelValue() {
        return parallelResult.getValue();
    }

    public T getSerialValue() {
        return parallelResult.getValue();
    }

    @Override
    public T getValue() {
        return getSerialValue();
    }

    public boolean isDifferent() {
        return !isSame();
    }

    public boolean isSame() {
        return parallelResult.getValue().equals(serialResult.getValue());
    }
}

interface Parallelizable<T> {
    T processParallelly();

    T processSerially();

    default CombinedResult<T> getResult() {
        MeasuredResult<T> parallelResult = measureParallel();
        MeasuredResult<T> serialResult = measureSerial();

        return CombinedResult.of(parallelResult, serialResult);
    }

    default MeasuredResult<T> measure(Supplier<T> supplier) {
        long startTime = System.currentTimeMillis();
        T value = supplier.get();
        long endTime = System.currentTimeMillis();

        return MeasuredResult.of(value, endTime - startTime);
    }

    default MeasuredResult<T> measureParallel() {
        return measure(this::processParallelly);
    }

    default MeasuredResult<T> measureSerial() {
        return measure(this::processSerially);
    }
}

The idea was that by implementing the Parallelizable interface, I'd define the serial and parallel versions of the code and use the getResult() function to get a CombinedResult object with the values and time measurement to unit test with. Here's my implementation of the array sum.

@AllArgsConstructor
public class ArraySum implements Parallelizable<Integer> {
    private final int[] nums;

    @Override
    public Integer processParallelly() {
        return new ParallelForkJoinImpl(0, nums.length).compute();
    }

    @Override
    public Integer processSerially() {
        int sum = 0;
        for (int num : nums) {
            sum += num;
        }
        return sum;
    }

    @AllArgsConstructor()
    private class ParallelForkJoinImpl extends RecursiveTask<Integer> {
        private static final int THRESHOLD = 1_000;

        private final int start;
        private final int end;

        @Override
        protected Integer compute() {
            if (end - start <= THRESHOLD) {
                int sum = 0;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                return sum;
            }
            int mid = (start + end) / 2;
            ForkJoinTask<Integer> left = new ParallelForkJoinImpl(start, mid).fork();
            ForkJoinTask<Integer> right = new ParallelForkJoinImpl(mid, end).fork();

            return left.join() + right.join();
        }
    }
}

From what I understand, calling fork() on the RecursiveTask implementation should give me a Future object as response which will block on computation when the join() function is called on it. Also use the common ForkJoinPool will automatically be used when fork() is called.

But like I said, the value for elapsedTimeMillis for the parallel implementation is 3-4 times larger than the serial implementation, and I don't know why. What did I do wrong here?

aa8y
  • 3,854
  • 4
  • 37
  • 62
  • 1
    The overhead of using fork/join is high compared to the trivial time it takes to sum numbers. Try increasing THRESHOLD. – tgdavies Feb 05 '22 at 07:26
  • @tgdavies That was it. Thank you! – aa8y Feb 07 '22 at 04:49
  • 2
    Generally, you shouldn’t use a threshold for the number of items to process, but a threshold for the number of jobs to spawn, which should be adjusted to the number of actually available worker threads which can pick up the new jobs. Read [this answer](https://stackoverflow.com/a/48174508/2711488) for more details. Further, while textbooks suggest to spawn two sub tasks, followed by waiting for both, it’s more efficient to spawn one task and process the other directly in the already running thread. – Holger Feb 07 '22 at 17:30
  • 2
    Practical examples in [this answer](https://stackoverflow.com/a/61122549/2711488) (second half) and [this answer](https://stackoverflow.com/a/48212098/2711488). – Holger Feb 07 '22 at 17:31
  • @Holger That actually makes a lot of sense. Thank you for pointing that out and providing links for the reading material. – aa8y Feb 14 '22 at 18:20

0 Answers0