7

Consider this (completely contrived) Java code:

final List<Integer> s = Arrays.asList(1, 2, 3);
final int[] a = new int[1];
a[0] = 100;
s.parallelStream().forEach(i -> {
    synchronized (a) {
        a[0] += i;
    }
});
System.out.println(a[0]);

Is this code guaranteed to output "106"?

It seems like it is not unless there is a happens-before relationship established by parallelStream(), by which we can know for sure that the first accesses to a[0] in the lambda will see 100 and not zero (according to my understanding of the Java memory model).

But Collection.parallelStream() is not documented to establish such a relationship...

The same question can be asked for the completion of the parallelStream() method invocation.

So am I missing something, or is it true that for correctness would the above code be required to look something like this instead:

final List<Integer> s = Arrays.asList(1, 2, 3);
final int[] a = new int[1];
synchronized (a) {
    a[0] = 100;
}
s.parallelStream().forEach(i -> {
    synchronized (a) {
        a[0] += i;
    }
});
synchronized (a) {
    System.out.println(a[0]);
}

Or... does parallelStream() actually provide these happens-before relationships, and this simply a matter of some missing documentation?

I'm asking because from an API design perspective, it seems (to me at least) like this would be a logical thing to do... analogous to Thread.start(), etc.

Ousmane D.
  • 54,915
  • 8
  • 91
  • 126
Archie
  • 4,959
  • 1
  • 30
  • 36
  • I'm not sure where it's specified, but I'm pretty sure there is some *happens-before* edge before running a stream pipeline, whether sequential or parallel. Otherwise all bets are off. You might populate a list, stream it to a set, and find it empty. On a side note, you can avoid synchronizing your `forEach()` actions by using [`forEachOrdered()`](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#forEachOrdered-java.util.function.Consumer-) instead. – shmosel Dec 23 '18 at 22:17
  • 2
    Note that parallel streams internally use the ForkJoinPool ([though that's not well-documented](https://stackoverflow.com/q/24629247/1553851)), which implements ExecutorService, along with the [visibility guarantees](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#MemoryVisibility) related to task submission. – shmosel Dec 23 '18 at 22:29
  • 4
    I think we all agree that there's probably a happens-before edge in there somewhere, practically speaking. What bugs me is that this is not guaranteed and documented, so for correctness you have to assume there's not. – Archie Dec 24 '18 at 19:45
  • @shmosel I’m very sure that you are allowed to always assume a valid *result* being returned from a parallel stream’s terminal operation, i.e. in case of `collect(toSet())`, that would be the returned `Set`. But *side effects*, as performed by `peek` or `forEach` are a different beast… – Holger Mar 25 '19 at 17:27

3 Answers3

1

You really should avoid hitting variables 'outside' the pipeline. Even if you get it to work correctly performance will likely suffer. There are a lot of tools to achieve this built into the JDK. For example your use case is probably safer with something like:

Integer reduce = IntStream.of(1, 2, 3)
            .parallel()
            .reduce(100, (accumulator, element) -> accumulator + element);
Adam Bickford
  • 1,216
  • 12
  • 14
0

Here is a list of actions that establish a happens-before relationship. As you can see parallelStream is not mentioned there, so to answer your question: no, parallelStream by itself doesn't establish a happens-before relationship.

As to the first access reading zero - If the main thread sets 100 before the parallelStream is being processed, then each thread the paralleStream starts will see that value, quoting from the link:

A call to start on a thread happens-before any action in the started thread.

BTW, your lambda expression usage is stateful, which is discouraged

limido
  • 327
  • 2
  • 14
  • 4
    The logic in this answer is incorrect. You seem to be assuming that that list and the JLS are the only places where *happens before* relationships are / may be specified. – Stephen C Dec 23 '18 at 22:44
  • 2
    The threads in the thread pool may have already started (this is of course one of the main goals of "thread pools" in the first place). So the guarantee provided by `Thread.start()` doesn't necessarily apply here. – Archie Dec 24 '18 at 19:47
0

I personally use the below code to guaranteed,

        final List<Integer> s = Arrays.asList(1, 2, 3);
        AtomicInteger atomicInteger = new AtomicInteger(100);
        s.parallelStream()
                .forEach(atomicInteger::addAndGet);
        System.out.println(atomicInteger.get());

Using parallel stream for fewer numbers is not good practice.

parrotjack
  • 432
  • 1
  • 6
  • 18