36

I've already read this and this questions, but still doubt whether the observed behavior of Stream.skip was intended by JDK authors.

Let's have simple input of numbers 1..20:

List<Integer> input = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());

Now let's create a parallel stream, combine the unordered() with skip() in different ways and collect the result:

System.out.println("skip-skip-unordered-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .skip(1)
            .unordered()
            .collect(Collectors.toList()));
System.out.println("skip-unordered-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .skip(1)
            .collect(Collectors.toList()));
System.out.println("unordered-skip-skip-toList: "
        + input.parallelStream().filter(x -> x > 0)
            .unordered()
            .skip(1)
            .skip(1)
            .collect(Collectors.toList()));

Filtering step does essentially nothing here, but adds more difficulty for stream engine: now it does not know the exact size of the output, thus some optimizations are turned off. I have the following results:

skip-skip-unordered-toList: [3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// absent values: 1, 2
skip-unordered-skip-toList: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16, 17, 18, 19, 20]
// absent values: 1, 15
unordered-skip-skip-toList: [1, 2, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 19, 20]
// absent values: 7, 18

The results are completely fine, everything works as expected. In the first case I asked to skip first two elements, then collect to list in no particular order. In the second case I asked to skip the first element, then turn into unordered and skip one more element (I don't care which one). In the third case I turned into unordered mode first, then skip two arbitrary elements.

Let's skip one element and collect to the custom collection in unordered mode. Our custom collection will be a HashSet:

System.out.println("skip-toCollection: "
        + input.parallelStream().filter(x -> x > 0)
        .skip(1)
        .unordered()
        .collect(Collectors.toCollection(HashSet::new)));

The output is satisfactory:

skip-toCollection: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// 1 is skipped

So in general I expect that as long as stream is ordered, skip() skips the first elements, otherwise it skips arbitrary ones.

However let's use an equivalent unordered terminal operation collect(Collectors.toSet()):

System.out.println("skip-toSet: "
        + input.parallelStream().filter(x -> x > 0)
            .skip(1)
            .unordered()
            .collect(Collectors.toSet()));

Now the output is:

skip-toSet: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 17, 18, 19, 20]
// 13 is skipped

The same result can be achieved with any other unordered terminal operation (like forEach, findAny, anyMatch, etc.). Removing unordered() step in this case changes nothing. Seems that while unordered() step correctly makes the stream unordered starting from the current operation, the unordered terminal operation makes the whole stream unordered starting from very beginning despite that this can affect the result if skip() was used. This seems completely misleading for me: I expect that using the unordered collector is the same as turning the stream into unordered mode just before the terminal operation and using the equivalent ordered collector.

So my questions are:

  1. Is this behavior intended or it's a bug?
  2. If yes is it documented somewhere? I've read Stream.skip() documentation: it does not say anything about unordered terminal operations. Also Characteristics.UNORDERED documentation is not very comprehend and does not say that ordering will be lost for the whole stream. Finally, Ordering section in package summary does not cover this case either. Probably I'm missing something?
  3. If it's intended that unordered terminal operation makes the whole stream unordered, why unordered() step makes it unordered only since this point? Can I rely on this behavior? Or I was just lucky that my first tests work nicely?
Community
  • 1
  • 1
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • Don't have time to write a full answer, but note how the doc for the characteristics is phrased: "Indicates that **the collection operation** does not commit to preserving the encounter order of input elements." – mikołak Jun 15 '15 at 10:59
  • @mikołak, to my understanding the skipped element should not even appear among the collector input elements, because it was skipped before. So while this sentence states that input elements can be reordered it does not imply that the set of input elements can be changed. Probably it's about the definition of "input elements", but to my opinion it's assumed that it's the elements that accumulated by this collector, not source stream elements. – Tagir Valeev Jun 15 '15 at 11:06
  • 1
    But that's the thing - there's no "before". All of the previous operations are intermediate operations, and the stream reduction only happens once you perform the terminal op - collect in this case. – mikołak Jun 15 '15 at 11:13
  • @mikołak, by before I mean "before the element is fed to collector". When Collector accumulate method is called for some element, the previous intermediate operations are already performed for this element. See, if you perform a `filter()` operation, the filtered out element will never appear as the collector input element, because it was filtered out *before*. – Tagir Valeev Jun 15 '15 at 12:00
  • to challenge your intuition: what would you expect to happen in the case of `limit(10)` instead of `skip()` after the `filter()` on a parallel stream? – the8472 Jun 15 '15 at 12:07
  • @the8472: for ordered stream: first 10 elements left over after filtration. For unordered stream: any 10 elements left over after filtration. The question is whether the stream is ordered at this point. Nevertheless I did not catch any problems with `limit`. Surprisingly it works as expected even with unordered collector. Well probably more complex tests may reveal problems with `limit` as well, but I don't know such examples. – Tagir Valeev Jun 15 '15 at 12:27
  • 2
    As I already said [there](http://stackoverflow.com/questions/28259636/is-this-a-bug-in-files-lines-or-am-i-misunderstanding-something-about-paralle#comment48327618_28259636), the behavior was more understandable if it was consistent. That still allows the behavior shown in that question intentional, but then we may consider the fact that the order is retained way too often as a bug. You know, `sorted().forEach()` should *not* sort. – Holger Jun 15 '15 at 13:14
  • Holger already pointed out a very good explanation. The difference between Collectors `toSet` and `toCollection` is that the first one is _unordered_ (as stated in the Javadocs) – Ruben Jun 16 '15 at 19:36
  • @Ruben, you probably don't understand my question. Roughly the problem is: why `unordered().collect(toCollection(HashSet::new))` behaves differently than `collect(toSet())`. Of course I know that `toSet()` is unordered. – Tagir Valeev Jun 17 '15 at 02:42
  • 1
    Is your initial line of code missing a `boxed()` call? I can't `collect()` like that without `boxed()`. – Thomas Weller Jun 18 '15 at 13:53
  • Is there any reason you keep `.parallelStream().filter(x -> x > 0)`? IMHO it can be removed without affecting the result. Just eliminates another potential source of problems... – Thomas Weller Jun 18 '15 at 13:56
  • 1
    @Thomas, thanks, `boxed()` added. The `.parallelStream().filter(x -> x > 0)` is necessary, because I want to *reveal* the problems, not *eliminate* them :-) Of course it's an artificial and simplified example. In practice such problem may occur if you use, for example, `bufferedReader.lines().skip(1).parallel().forEach(...)`. See the linked questions. – Tagir Valeev Jun 18 '15 at 15:33
  • Ok, I got it. You don't want a simple workaround or fix like most people here, which is great. Thanks for looking behind the scenes and figuring out root problems. – Thomas Weller Jun 19 '15 at 06:19
  • So, related to [my question](http://stackoverflow.com/questions/28521382/does-a-good-use-case-exist-for-skip-on-parallel-streams) (second link in your question), and now that it was admitted this was a bug, and that it was fixed as well, what do you think of my question: Does a good use case exist for `skip()` on parallel streams? – fps Sep 17 '15 at 14:06
  • 1
    @FedericoPeraltaSchaffner, well if you parse some text file with header line and have a high-Q processing, probably `lines.stream().skip(1).parallel().blahblah` will be efficient for you. – Tagir Valeev Sep 21 '15 at 05:22

2 Answers2

30

Recall that the goal of stream flags (ORDERED, SORTED, SIZED, DISTINCT) is to enable operations to avoid doing unnecessary work. Examples of optimizations that involve stream flags are:

  • If we know the stream is already sorted, then sorted() is a no-op;
  • If we know the size of the stream, we can pre-allocate a correct-sized array in toArray(), avoiding a copy;
  • If we know that the input has no meaningful encounter order, we need not take extra steps to preserve encounter order.

Each stage of a pipeline has a set of stream flags. Intermediate operations can inject, preserve, or clear stream flags. For example, filtering preserves sorted-ness / distinct-ness but not sized-ness; mapping preserves sized-ness but not sorted-ness or distinct-ness. Sorting injects sorted-ness. The treatment of flags for intermediate operations is fairly straightforward, because all decisions are local.

The treatment of flags for terminal operations is more subtle. ORDERED is the most relevant flag for terminal ops. And if a terminal op is UNORDERED, then we do back-propagate the unordered-ness.

Why do we do this? Well, consider this pipeline:

set.stream()
   .sorted()
   .forEach(System.out::println);

Since forEach is not constrained to operate in order, the work of sorting the list is completely wasted effort. So we back-propagate this information (until we hit a short-circuiting operation, such as limit), so as not to lose this optimization opportunity. Similarly, we can use an optimized implementation of distinct on unordered streams.

Is this behavior intended or it's a bug?

Yes :) The back-propagation is intended, as it is a useful optimization that should not produce incorrect results. However, the bug part is that we are propagating past a previous skip, which we shouldn't. So the back-propagation of the UNORDERED flag is overly aggressive, and that's a bug. We'll post a bug.

If yes is it documented somewhere?

It should be just an implementation detail; if it were correctly implemented, you wouldn't notice (except that your streams are faster.)

Brian Goetz
  • 90,105
  • 23
  • 150
  • 161
  • 2
    Thank you! That's exactly an answer I was waiting for :-) I've implemented a [skipOrdered](https://github.com/amaembo/streamex/blob/8e097c4eaf63baec162424ef07c7b2cf1d669f72/src/main/java/javax/util/streamex/AbstractStreamEx.java#L1186) method in my library to workaround the bug. It takes the stream spliterator, converts it to sequential, performs `skip()`, then turns it back to `parallel()` if necessary. Hopefully the original `skip()` will be fixed in JDK9, so this method will become unnecessary. – Tagir Valeev Jun 18 '15 at 13:55
  • 11
    After some analysis we chose to back out back-propagation entirely. The only place where this pays off is optimizing away a sort; if you have a pipeline that sorts into an unordered terminal op, that's probably a user error anyway. – Brian Goetz Jun 21 '15 at 22:55
  • 3
    @Brian Goetz: just to get it correct, terminal operations won’t back-propagate the unordered attribute any more? So does that imply that there won’t be a difference between `forEach` and `forEachOrdered` in this regard? – Holger Sep 01 '15 at 12:03
  • 5
    @Holger Correct, there is no more back-propagation of terminal flags, meaning that the ordered-ness (or lack) of the terminal op does not affect the behavior of *earlier* operations. Of course, there is still a difference between `forEach` and `forEachOrdered.` – Brian Goetz Sep 01 '15 at 14:58
1

@Ruben, you probably don't understand my question. Roughly the problem is: why unordered().collect(toCollection(HashSet::new)) behaves differently than collect(toSet()). Of course I know that toSet() is unordered.

Probably, but, anyway, I will give it a second try.

Having a look at the Javadocs of Collectors toSet and toCollection we can see that toSet delivers an unordered collector

This is an {@link Collector.Characteristics#UNORDERED unordered} Collector.

i.e., a CollectorImpl with the UNORDERED Characteristic. Having a look at the Javadoc of Collector.Characteristics#UNORDERED we can read:

Indicates that the collection operation does not commit to preserving the encounter order of input elements

In the Javadocs of Collector we can also see:

For concurrent collectors, an implementation is free to (but not required to) implement reduction concurrently. A concurrent reduction is one where the accumulator function is called concurrently from multiple threads, using the same concurrently-modifiable result container, rather than keeping the result isolated during accumulation. A concurrent reduction should only be applied if the collector has the {@link Characteristics#UNORDERED} characteristics or if the originating data is unordered

This means to me that, if we set the UNORDERED characteristic, we do not care at all about the order in which the elements of the stream get passed to the accumulator, and, therefore, the elements can be extracted from the pipeline in any order.

Btw, you get the same behavior if you omit the unordered() in your example:

    System.out.println("skip-toSet: "
            + input.parallelStream().filter(x -> x > 0)
                .skip(1)
                .collect(Collectors.toSet()));

Furthermore, the skip() method in Stream gives us a hint:

While {@code skip()} is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines

and

Using an unordered stream source (such as {@link #generate(Supplier)}) or removing the ordering constraint with {@link #unordered()} may result in significant speedups

When using

Collectors.toCollection(HashSet::new)

you are creating a normal "ordered" Collector (one without the UNORDERED characteristic), what to me means that you do care about the ordering, and, therefore, the elements are being extracted in order and you get the expected behavior.

Ruben
  • 3,986
  • 1
  • 21
  • 34
  • Thank you for the attention to my problem, but this does not answer the question at all. The "For concurrent collectors" part is irrelevant as no collectors in question have `CONCURRENT` characteristic. I know that `toSet` is unordered, so it turns terminal operation to unordered mode, I mentioned this in question. I also mentioned that removing `unordered()` changes nothing, so I know about the same behavior when I omit the unordered(). I'm not speaking about performance, only about correctness, thus whether `skip()` is cheap or not is out of the question scope. – Tagir Valeev Jun 17 '15 at 06:14
  • The last quote says about "unordered stream source" or `unordered()` intermediate operation. These are working perfectly fine. It says nothing about unordered terminal operation which I have problems with. And of course I know that `Collectors.toCollection(HashSet::new)` is ordered collector. – Tagir Valeev Jun 17 '15 at 06:16