20

I was reading about statelessness and came across this in doc:

Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful. A stateful lambda (or other object implementing the appropriate functional interface) is one whose result depends on any state which might change during the execution of the stream pipeline.

Now if I have the a list of string (strList say) and then trying to remove duplicate strings from it using parallel streams in the following way:

List<String> resultOne = strList.parallelStream().distinct().collect(Collectors.toList());

or in case we want case insensitive:

List<String> result2 = strList.parallelStream().map(String::toLowerCase)
                       .distinct().collect(Collectors.toList());

Can this code have any problem as parallel streams will split the input and distinct in one chunk does not necessarily mean distinct in the whole input?

EDIT (Quick summary of the answers below)

The distinct is a stateful operation and in case of stateful intermediate operations parallel streams may require multiple passes or substantial buffering overheads. Also distinct can be implemented more efficiently if ordering of elements is not relevant. Also as per doc:

For ordered streams, the selection of distinct elements is stable (for duplicated elements, the element appearing first in the encounter order is preserved.) For unordered streams, no stability guarantees are made.

But in case of ordered stream running in parallel distinct may be unstable - means it will keep an arbitrary element in case of duplicates and not necessarily the first one as expected from distinct otherwise.

From the link:

Internally, the distinct() operation keeps a Set that contains elements that have been seen previously, but it’s buried inside the operation and we can’t get to it from application code.

So in case of parallel streams it would probably consume the entire stream or may use CHM (sth like ConcurrentHashMap.newKeySet()). And for ordered ones most likely it would be using LinkedHashSet or similar contruct.

akhil_mittal
  • 23,309
  • 7
  • 96
  • 95
  • 1
    Only problem I can think of is that the order of the strings may different than the initial order in the `strList` – smac89 Dec 06 '18 at 05:24
  • 2
    There's a hint in the apiNote of `Stream#distinct`: "**@apiNote**: Preserving stability for distinct() in parallel pipelines is relatively expensive (requires that the **operation act as a full barrier, with substantial buffering overhead**)". And the same can be asked about reduction operations too (through parallel reduction is more easily conceivable than this `distinct` operation) – ernest_k Dec 06 '18 at 05:30
  • 2
    The quoted text is about *lambdas*, and `distinct()` doesn't take a lambda, so the quoted text is irrelevant. Also, if you **read the documentation**, i.e. the javadoc of [`distinct()`](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#distinct--), you will see that it fully addresses the behavior of the method in *parallel* pipelines. The only problem is *performance*. The method guarantees *functionality*, as described by the javadoc. – Andreas Dec 06 '18 at 05:33

4 Answers4

15

Roughly pointing out the relevant parts of the doc (Emphasis, mine):

Intermediate operations are further divided into stateless and stateful operations. Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such as distinct and sorted, may incorporate state from previously seen elements when processing new elements

Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering

If you read further down (section on ordering):

Streams may or may not have a defined encounter order. Whether or not a stream has an encounter order depends on the source and the intermediate operations. Certain stream sources (such as List or arrays) are intrinsically ordered, whereas others (such as HashSet) are not. Some intermediate operations, such as sorted(), may impose an encounter order on an otherwise unordered stream, and others may render an ordered stream unordered, such as BaseStream.unordered(). Further, some terminal operations may ignore encounter order, such as forEach().

...

For parallel streams, relaxing the ordering constraint can sometimes enable more efficient execution. Certain aggregate operations, such as filtering duplicates (distinct()) or grouped reductions (Collectors.groupingBy()) can be implemented more efficiently if ordering of elements is not relevant. Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism. In cases where the stream has an encounter order, but the user does not particularly care about that encounter order, explicitly de-ordering the stream with unordered() may improve parallel performance for some stateful or terminal operations. However, most stream pipelines, such as the "sum of weight of blocks" example above, still parallelize efficiently even under ordering constraints.

In conclusion,

  • distinct will work fine with parallel streams, but as you may already know, it has to consume the entire stream before continuing and this may use a lot of memory.
  • If the source of the items is an unordered collection (such as hashset) or the stream is unordered(), then distinct is not worried about ordering the output and thus will be efficient

Solution is to add .unordered() to the stream pipeline if you are not worried about order and would like to see more performance.

List<String> result2 = strList.parallelStream()
                              .unordered()
                              .map(String::toLowerCase)
                              .distinct()
                              .collect(Collectors.toList());

Alas there is no (available builtin) concurrent hashset in Java (unless they got clever with ConcurrentHashMap), so I can only leave you with the unfortunate possibility that distinct is implemented in a blocking fashion using a regular Java set. In which case, I don't see any benefit of doing a parallel distinct.


Edit: I spoke too soon. There might be some benefit with using parallel streams with distinct. It looks like distinct is implemented with more cleverness than I initially thought. See @Eugene's answer.

smac89
  • 39,374
  • 15
  • 132
  • 179
  • 1
    Was reading the docs as well for this one.. *As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data.* seems to be the precise answer OP is looking for. – Naman Dec 06 '18 at 05:54
  • 1
    @smac well there actually *is* a built-in concurrent `Set`. That the factory method for it is placed in `ConcurrentHashMap` does not really change that: [`newKeySet()`](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ConcurrentHashMap.html#newKeySet()) – Hulk Dec 06 '18 at 07:36
  • The implementation (a java 8 version) I'm currently looking at actually uses a `ConcurrentHashMap` directly for the "parallel unordered" case, but uses a different (reduction-based) approach for "parallel ordered". – Hulk Dec 06 '18 at 07:58
  • @Hulk right, exactly what I mentioned in my answer, obviously a `LinkedHashSet` would be used in case of a ordered operation; but there is one more interesting approach used internally when the source is known to be sorted – Eugene Dec 06 '18 at 09:36
  • @Hulk how did you find that the implementation uses CHM for parallel unordered case. I am not able to find this in source code. – akhil_mittal Dec 12 '18 at 05:36
  • @i_am_zero I found the implementation in `DistinctOps.makeRef()` - this returns an anonymous inner class derived from `ReferencePipeline.StatefulOp` which overrides method `opEvaluateParallel()`. There is a [...]`else if (StreamOpFlag.ORDERED.isKnown(...)` - the parallel unordered case is the corresponding `else` - block (I'm looking at 1.8.0_121) – Hulk Dec 12 '18 at 07:13
  • @i_am_zero seems it is still like that: [OpenJdk 11 Source](https://hg.openjdk.java.net/jdk/jdk11/file/1ddf9a99e4ad/src/java.base/share/classes/java/util/stream/DistinctOps.java) starting at line 79 (including the TODO...) – Hulk Dec 12 '18 at 07:41
  • @Hulk Thanks :) – akhil_mittal Dec 12 '18 at 11:43
3

You seem to miss quite a few things from the documentation you provide and the actual example.

Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful.

In your example, you don't have any stateful operations defined by you. Stateful in the doc means the ones the you define, not the ones that are implemented by jdk itself - like distinct in your example. But either way you could define a stateful operation that would be correct, even Stuart Marks - working at Oracle/Java, provides such an example.

So you are more than OK in the examples that you provide, be it parallel or not.

The expensive part of distinct (in parallel) come from the fact that internally there has to be a thread-safe data structure that would keep distinct elements; in jdk case it is a ConcurrentHashMap used in case the order does not matter, or a reduction using a LinkedHashSet when order matters.

distinct btw is a pretty smart implementation, it looks if your source of the stream is already distinct (in such a case it is a no-op), or looks if your data is sorted, in which case it will do a little smarter traversal of the source (since it knows that if you have seen one element, the next to come is either the same you just seen or a different one), or using a ConcurrentHashMap internally, etc.

Eugene
  • 117,005
  • 15
  • 201
  • 306
  • I can see that `set` is used to maintain non-duplicates in case of `distinct` operation. In case of parallel streams it may consume the entire stream itself rather than going with CHM. Thoughts? – akhil_mittal Dec 12 '18 at 04:42
  • @i_am_zero not sure I follow u, care to expand your comment? – Eugene Dec 12 '18 at 04:47
  • Okay. I simply want to understand how parallel streams will work in case of `distinct`? Will it consume the entire input at one go (no benefit in this case) or will it use CHM? I could not find any official reference clarifying this. – akhil_mittal Dec 12 '18 at 04:54
  • @i_am_zero either way, the entire stream will not be consumed, at once. And you dont have to look at the implementation to understand that. `Stream.of(1,2,2,3,4).peek(System.out::println).distinct().peek(System.out::println).collect(Collectors.toList());` – Eugene Dec 12 '18 at 05:00
2

There won't be a problem (problem as in a wrong result) but as the API note says

Preserving stability for distinct() in parallel pipelines is relatively expensive

But if performance is of concern and if stability is not a problem (i.e the result having a different order of elements with respect to the collection it processed ) then you follow the API's note

removing the ordering constraint with BaseStream.unordered() may result in significantly more efficient execution for distinct() in parallel pipelines,

I thought why not benchmark performance of parallel and sequential streams for distinct

public static void main(String[] args) {
        List<String> strList = Arrays.asList("cat", "nat", "hat", "tat", "heart", "fat", "bat", "lad", "crab", "snob");

        List<String> words = new Vector<>();


        int wordCount = 1_000_000; // no. of words in the list words
        int avgIter = 10; // iterations to run to find average running time

        //populate a list randomly with the strings in `strList`
        for (int i = 0; i < wordCount; i++) 
            words.add(strList.get((int) Math.round(Math.random() * (strList.size() - 1))));





        //find out average running times
        long starttime, pod = 0, pud = 0, sod = 0;
        for (int i = 0; i < avgIter; i++) {
            starttime = System.currentTimeMillis();
            List<String> parallelOrderedDistinct = words.parallelStream().distinct().collect(Collectors.toList());
            pod += System.currentTimeMillis() - starttime;

            starttime = System.currentTimeMillis();
            List<String> parallelUnorderedDistinct =
                    words.parallelStream().unordered().distinct().collect(Collectors.toList());
            pud += System.currentTimeMillis() - starttime;

            starttime = System.currentTimeMillis();
            List<String> sequentialOrderedDistinct = words.stream().distinct().collect(Collectors.toList());
            sod += System.currentTimeMillis() - starttime;
        }

        System.out.println("Parallel ordered time in ms: " + pod / avgIter);
        System.out.println("Parallel unordered time in ms: " + pud / avgIter);
        System.out.println("Sequential implicitly ordered time in ms: " + sod / avgIter);
    }

The above was compiled by open-jdk 8 and run on openjdk's jre 8 (no jvm specific arguments) on an i3 6th gen (4 logical cores) and I got these results

Seemed like after a certain no. of elements, ordered parallel was faster and ironically parallel unordered was the slowest. The reason behind this (thanks to @Hulk) is the because of the way its implemented (using a HashSet).So a general rule would be that if you a few elements and a lot of duplication several magnitudes greater you might benefit from the parallel().

1)

Parallel ordered time in ms: 52
Parallel unordered time in ms: 81
Sequential implicitly ordered time in ms: 35

2)

Parallel ordered time in ms: 48
Parallel unordered time in ms: 83
Sequential implicitly ordered time in ms: 34

3)

Parallel ordered time in ms: 36
Parallel unordered time in ms: 70
Sequential implicitly ordered time in ms: 32

The unordered parallel was twice slower than both.

Then I upped wordCount to 5_000_000 and these were the results

1)

Parallel ordered time in ms: 93
Parallel unordered time in ms: 363
Sequential implicitly ordered time in ms: 123

2)

Parallel ordered time in ms: 100
Parallel unordered time in ms: 363
Sequential implicitly ordered time in ms: 124

3)

Parallel ordered time in ms: 89
Parallel unordered time in ms: 365
Sequential implicitly ordered time in ms: 118

and then to 10_000_000

1)

Parallel ordered time in ms: 148
Parallel unordered time in ms: 725
Sequential implicitly ordered time in ms: 218

2)

Parallel ordered time in ms: 150
Parallel unordered time in ms: 749
Sequential implicitly ordered time in ms: 224

3)

Parallel ordered time in ms: 143
Parallel unordered time in ms: 743
Sequential implicitly ordered time in ms: 222
Ryotsu
  • 786
  • 6
  • 16
  • 1
    Explanation: Your setup - Lots of duplicates of very few distinct strings favors the "parallel ordered"-implementation, because (at least in the version I'm looking at) that is implemented via parallel reduction to `LinkedHashSet`s - as these sets quickly converge to exactly your small set of input strings, the combining step is pretty fast. If duplicates were rare (only a few duplicates to eliminate), the outcome would likely be quite different. – Hulk Dec 06 '18 at 08:31
  • @Hulk I tried it out with 40k distinct elements (8!) and yes you're right. I'll put it in my answer. – Ryotsu Dec 06 '18 at 09:20
1

From the javadocs, parallelStream()

Returns a possibly parallel Stream with this collection as its source. It is allowable for this method to return a sequential stream.

Performance:

  1. Let us consider we have a multiple stream(luckily) that is given to different cores of CPU. ArrayList<T> which has internal data representation based upon an array. Or a LinkedList<T> which needs more computation for splitting to be processed in parallel. ArrayList<T> is better in this case!
  2. stream.unordered().parallel().distinct() has better performance than stream.parallel().distinct()

Preserving stability for distinct() in parallel pipelines is relatively expensive (requires that the operation act as a full barrier, with substantial buffering overhead).

So, in your case it should not be a problem(Unless your List<T> does not care of order). Read below for explanation,

Lets say you have 4 elements in ArrayList, {"a","b","a","b"}

Now if you don't use parallelStream() before calling distinct(), only the String at positions 0 and 1 is retained.(Preserves the order,Sequential stream)

Else, (if you use parallelStream().distinct()) then elements at 1 and 2 can be retained as distinct(It is unstable, but the result is same {"a,"b"} or it can even be {"b","a"}).

An unstable distinct operation will randomly eliminate the duplicates.

Finally,

under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data

Mohamed Anees A
  • 4,119
  • 1
  • 22
  • 35