11

While reading the documentation about streams, I came across the following sentences:

  • ... attempting to access mutable state from behavioral parameters presents you with a bad choice ... if you do not synchronize access to that state, you have a data race and therefore your code is broken ... [1]

  • If the behavioral parameters do have side-effects ... [there are no] guarantees that different operations on the "same" element within the same stream pipeline are executed in the same thread. [2]

  • For any given element, the action may be performed at whatever time and in whatever thread the library chooses. [3]

These sentences don't make a distinction between sequential and parallel streams. So my questions are:

  1. In which thread is the pipeline of a sequential stream executed? Is it always the calling thread or is an implementation free to choose any thread?
  2. In which thread is the action parameter of the forEach terminal operation executed if the stream is sequential?
  3. Do I have to use any synchronization when using sequential streams?

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
Alex R
  • 3,139
  • 1
  • 18
  • 28

2 Answers2

2

This all boils down to what is guaranteed based on the specification, and the fact that a current implementation may have additional behaviors beyond what is guaranteed.

Java Language Architect Brian Goetz made a relevant point regarding specifications in a related question:

Specifications exist to describe the minimal guarantees a caller can depend on, not to describe what the implementation does.

[...]

When a specification says "does not preserve property X", it does not mean that the property X may never be observed; it means the implementation is not obligated to preserve it. [...] (HashSet doesn't promise that iterating its elements preserves the order they were inserted, but that doesn't mean this can't accidentally happen -- you just can't count on it.)

This all means that even if the current implementation happens to have certain behavioral characteristics, they should not be relied upon nor assumed that they will not change in new versions of the library.

Sequential stream pipeline thread

In which thread is the pipeline of a sequential stream executed? Is it always the calling thread or is an implementation free to choose any thread?

Current stream implementations may or may not use the calling thread, and may use one or multiple threads. As none of this is specified by the API, this behavior should not be relied on.

forEach execution thread

In which thread is the action parameter of the forEach terminal operation executed if the stream is sequential?

While current implementations use the existing thread, this cannot be relied on, as the documentation states that the choice of thread is up to the implementation. In fact, there are no guarantees that the elements aren't processed by different threads for different elements, though that is not something the current stream implementation does either.

Per the API:

For any given element, the action may be performed at whatever time and in whatever thread the library chooses.

Note that while the API calls out parallel streams specifically when discussing encounter order, that was clarified by Brian Goetz to clarify the motivation of the behavior, and not that any of the behavior is specific to parallel streams:

The intent of calling out the parallel case explicitly here was pedagogical [...]. However, to a reader who is unaware of parallelism, it would be almost impossible to not assume that forEach would preserve encounter order, so this sentence was added to help clarify the motivation.

Synchronization using sequential streams

Do I have to use any synchronization when using sequential streams?

Current implementations will likely work since they use a single thread for the sequential stream's forEach method. However, as it is not guaranteed by the stream specification, it should not be relied on. Therefore, synchronization should be used as though the methods could be called by multiple threads.

That said, the stream documentation specifically recommends against using side-effects that would require synchronization, and suggest using reduction operations instead of mutable accumulators:

Many computations where one might be tempted to use side effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators. [...] A small number of stream operations, such as forEach() and peek(), can operate only via side-effects; these should be used with care.

As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.

     ArrayList<String> results = new ArrayList<>();
     stream.filter(s -> pattern.matcher(s).matches())
           .forEach(s -> results.add(s));  // Unnecessary use of side-effects!

This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism. Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:

     List<String>results =
         stream.filter(s -> pattern.matcher(s).matches())
               .collect(Collectors.toList());  // No side-effects!
M. Justin
  • 14,487
  • 7
  • 91
  • 130
  • This is a nice summary of quotes I had also found back then. I've also come to the conclusion that it's unspecified behavior and a different `Stream` implementation might not use the calling thread for executing the pipeline. Even though your last paragraph mentions an anti-pattern, it's simply not always possible to prevent side-effects. After all, one use case of `Stream`s is to handle data that might be large to fit in memory. Since nobody else can provide a better answer (giving quotes), I believe this answer correct. – Alex R Jan 28 '21 at 08:49
  • 1
    Yeah, they even specifically call out that `forEach` & `peek` are designed to operate via side effects, just that they "should be used with care". – M. Justin Jan 28 '21 at 15:37
  • @AlexR Yeah, honestly I'd also prefer something a bit more from the horse's mouth than a collection of quotes & specs not explicitly ruling out behavior. – M. Justin Jan 28 '21 at 21:15
1
  1. Stream's terminal operations are blocking operations. In case there is no parallel excution, the thread that executes the terminal operation runs all the operations in the pipeline.

Definition 1.1. Pipeline is a couple of chained methods.

Definition 1.2. Intermediate operations will be located everywhere in the stream except at the end. They return a stream object and does not execute any operation in the pipeline.

Definition 1.3. Terminal operations will be located only at the end of the stream. They execute the pipeline. They does not return stream object so no other Intermidiate operations or terminal operations can be added after them.

  1. From the first solution we can conclude that the calling thread will execute the action method inside the forEach terminal operation on each element in the calling stream.

Java 8 introduces us the Spliterator interface. It has the capabilities of Iterator but also a set of operations to help performing and spliting a task in parallel.

When calling forEach from primitive streams in sequential execution, the calling thread will invoke the Spliterator.forEachRemaining method:

@Override
public void forEach(IntConsumer action) {
   if (!isParallel()) {
        adapt(sourceStageSpliterator()).forEachRemaining(action);
    }
    else {
        super.forEach(action);
    }
}

You can read more on Spliterator in my tutorial: Part 6 - Spliterator

  1. As long as you don't mutate any shared state between multiple threads in one of the stream operations(and it is forbidden - explained soon), you do not need to use any additional synchronization tool or algorithm when you want to run parallel streams.

Stream operations like reduce use accumulator and combiner functions for executing parallel streams. The streams library by definition forbids mutation. You should avoid it.

There are a lot of definitions in concurrent and parallel programming. I will introduce a set of definitions that will serve us best.

Definition 8.1. Concurrent programming is the ability to solve a task using additional synchronization algorithms.

Definition 8.2. Parallel programming is the ability to solve a task without using additional synchronization algorithms.

You can read more about it in my tutorial: Part 7 - Parallel Streams.

Stav Alfi
  • 13,139
  • 23
  • 99
  • 171
  • 4
    Thank you for your answer. Could you please provide sources for you statements? Especially for "the thread that executes the terminal operation runs all the operations in the pipeline" and "the calling thread will execute the action method". The code example doesn't really help either, because the implementation could be changed in the future (even if I don't see a reason why it should be). – Alex R Aug 25 '17 at 21:12
  • Even if the calling thread does not run all the operations in the pipeline or the calling thread does not execute the action method (and it makes no sense because in that case additional sync' algorithm is needed), it will be blocked. You can easly check it now by printing the thread id in the `action` function. But even if I will be wrong in the future, the behavior of calling thread is blocked will remain. All in all, I can't find a source for those 2 sentences except the source code I provided but the most important note here is that the general behavior won't change. – Stav Alfi Aug 25 '17 at 21:34
  • How do you know that it will really be blocked? If I just have one action (e.g. `Thread.sleep(1000); ;` ) and `forEach()` passes this one action to another thread, then `forEach()` will return immediately and this in turn means that I have to synchronize access to the data. This seems to be valid behavior of the `forEach()` method or do I misunderstand the my third quote? – Alex R Aug 25 '17 at 22:11
  • In parallel execution, if an operation of the stream library needs to synchronize between multiple threads, it knows how to with out the caller need to worry about it. In parallel execution, after a working thread (from a thread pool) finishes his exection, he will wait for the next task his thread pool will asign to him. If there is only one element in the stream who called `forEach` then the `trashhold` is too small for stream to choose parallel exection. In addition `forEach` is a stateles operation so there is no need to wait for other threads to finish their task(executing `action`). – Stav Alfi Aug 25 '17 at 22:28
  • Did you mean to ask about `forEachOrdered`? It is stateful & terminal operation. The `forEachOrdered` method will run action function on each element in the stream and the order which elements from the calling stream are sending to action is defined by encounting order if exist. Else the order of elements in the calling stream. Note: If element `n` by the order of the calling stream did not reach yet to the forEachOrdered operation then any other element `m > n` which came to forEachOrdered will have to wait. – Stav Alfi Aug 25 '17 at 22:33
  • 1
    The one element was just an example. Let's say I have a million of said action elements. Would it be valid behavior of the `forEach()` method of an sequential stream to pass these element to other threads? – Alex R Aug 25 '17 at 22:46
  • No! Otherwise it would be the behavior of parallel stream running `forEach` by definition. – Stav Alfi Aug 26 '17 at 08:56
  • 4
    Sorry, I don't think you're right. I think, the difference between a sequential and a parallel stream is _HOW_ the pipeline is executed, not _WHERE_ it is executed (see stream package summary). The documentation of the `forEach` method clearly states "...the action may be performed ... in whatever thread the library chooses.". The last paragraph of this [answer](https://stackoverflow.com/a/34253279/7970787) by Brian Goetz states that "calling out the parallel case explicitly here was pedagogical". – Alex R Aug 26 '17 at 15:36
  • First of all, I never claimed what you have said I did. Sencondly you didn't understand what the javadoc said. They were talking about the behavior of parallel execution only. "For parallel stream pipelines.... " - until the end of the paragraph. Otherwise there was no difference between parallel and sequential execution of `forEach` method. You are welcome to read the difference between `forEach` and `forEachOrdered` on parallel execution in the java doc . It will help you to understand when threads are waiting for other threads to finish their tasks and when they don't. – Stav Alfi Aug 26 '17 at 15:56
  • 1
    That's why I'm confused :-) They talk about parallel streams and then the sentence ends. Brian's post states that the intention of this sentence is only to clarify the behavior of this method to people unware of parallelism. He even says "the spec is still perfectly clear with that sentence removed entirely". Anyway, thank you for your patience! I think we should end this discussion here. Sadly I'm not yet able to use the chat. – Alex R Aug 26 '17 at 16:09
  • Edit you post with what you are still confuced about. It is not clear. – Stav Alfi Aug 26 '17 at 16:25
  • 1
    There's a typo from your tutorial quoted here: it's "concurrent", not "cuncurrent". – M. Justin Jan 27 '21 at 07:00