36

Environment: Ubuntu x86_64 (14.10), Oracle JDK 1.8u25

I try and use a parallel stream of Files.lines() but I want to .skip() the first line (it's a CSV file with a header). Therefore I try and do this:

try (
    final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
        .skip(1L).parallel();
) {
    // etc
}

But then one column failed to parse to an int...

So I tried some simple code. The file is question is dead simple:

$ cat info.csv 
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$

And the code is equally simple:

public static void main(final String... args)
{
    final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
    Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
        .forEach(System.out::println);
}

And I systematically get the following result (OK, I have only run it something around 20 times):

startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes

What am I missing here?


EDIT It seems like the problem, or misunderstanding, is much more rooted than that (the two examples below were cooked up by a fellow on FreeNode's ##java):

public static void main(final String... args)
{
    new BufferedReader(new StringReader("Hello\nWorld")).lines()
        .skip(1L).parallel()
        .forEach(System.out::println);

    final Iterator<String> iter
        = Arrays.asList("Hello", "World").iterator();
    final Spliterator<String> spliterator
        = Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED);
    final Stream<String> s
        = StreamSupport.stream(spliterator, true);

    s.skip(1L).forEach(System.out::println);
}

This prints:

Hello
Hello

Uh.

@Holger suggested that this happens for any stream which is ORDERED and not SIZED with this other sample:

Stream.of("Hello", "World")
    .filter(x -> true)
    .parallel()
    .skip(1L)
    .forEach(System.out::println);

Also, it stems from all the discussion which already took place that the problem (if it is one?) is with .forEach() (as @SotiriosDelimanolis first pointed out).

Community
  • 1
  • 1
fge
  • 119,121
  • 33
  • 254
  • 329
  • 4
    I think the problem is with `forEach`. – Sotirios Delimanolis Feb 01 '15 at 05:09
  • 3
    There's no room for misunderstanding---I just can't believe such a blatant bug is in the JDK. I tried your code with ten elements and guess what, the *tenth* element is skipped. – Marko Topolnik Feb 01 '15 at 21:05
  • @MarkoTopolnik hmm OK, so, a bug should be reported somewhat... However I'm not sure how to report it. It _is_ more general than `Files.lines()`, now I wonder whether this bug affects all `ORDERED` Streams or not – fge Feb 01 '15 at 21:38
  • I think the example from EDIT is more than enough to start the process. Using more than just two elements possibly accentuates the issue. – Marko Topolnik Feb 01 '15 at 22:22
  • 2
    You can simplify your second example to `Stream.of("Hello", "World").filter(x->true) .parallel().skip(1).forEach(System.out::println);` as any stream without predictable result size will do. – Holger Feb 02 '15 at 09:34
  • Is there anything else you want to know which is not explained by one of the answers below? Or in other words: http://i.imgur.com/QWV27Ap.jpg ;) – Nicolai Parlog Feb 27 '15 at 09:41
  • @NicolaiParlog I am still unsure as to whether this is on purpose or not. As already bountied, a "canonical" answer would let me, say, "rest at ease". Bountying again... – fge Feb 27 '15 at 10:39
  • 6
    Nice answers by Nicolai and Magnamag. I'll just add one bit of detail that was missed by those: the Stream implementation will back-propagate an UNORDERED characteristic up the pipeline where it can. This enables a computation like orderedStream().sorted().forEach() to optimize away the sort. (In other words, imagine `.forEach()` was really a macro for `.unordered().forEach()`). Understanding ordering in the context of parallelism is hard; we all have years of sequential bias that leads us to implicit assumptions about ordering. – Brian Goetz Mar 01 '15 at 21:24
  • 4
    @Brian Goetz: it *may* optimize away the `sort`, so it’s a pity that it doesn’t. Even when using `.parallel().sorted().forEach()`, the terminal consumer will process the items in arbitrary order, but only after the implementation wasted resources for sorting the items first. Since the sorting is still noticeable in a sequential context, when using `.sorted().forEach()`, it’s no wonder that developers are surprised when being told that “`.forEach()` makes the *entire pipeline* unordered”. It would be far better to understand if it really did, consistently. – Holger May 07 '15 at 17:51
  • 1
    I think part of this confusion comes from the fact that `.parallel()` makes the *whole pipeline* parallel (including operations before it). See http://mail.openjdk.java.net/pipermail/lambda-libs-spec-experts/2013-March/001504.html – Tavian Barnes Jun 01 '15 at 02:11

5 Answers5

20

Since the current state of the issue is quite the opposite of the earlier statements made here, it should be noted, that there is now an explicit statement by Brian Goetz about the back-propagation of the unordered characteristic past a skip operation is considered a bug. It’s also stated that it is now considered to have no back-propagation of the ordered-ness of a terminal operation at all.

There is also a related bug report, JDK-8129120 whose status is “fixed in Java 9” and it’s backported to Java 8, update 60

I did some tests with jdk1.8.0_60 and it seems that the implementation now indeed exhibits the more intuitive behavior.

Community
  • 1
  • 1
Holger
  • 285,553
  • 42
  • 434
  • 765
  • 4
    Thanks for reminding this. I totally forgot to test 8u60. It's actually very interesting that all the upvoted answers insist that it's not a bug, while it is :-) – Tagir Valeev Sep 16 '15 at 16:14
  • 8
    "**This is a bug**" should be written in big letters in this answer! And everybody should vote for it! – Didier L Sep 16 '15 at 20:19
  • 2
    @DidierL after re-reading everything again and (I believe) having wrapped my head around all of this I agree with you. I have changed the accepted answer to this one. – fge Sep 17 '15 at 12:08
19

THIS ANSWER IS OUTDATED - READ THIS ONE INSTEAD!


To quickly answer the question: The observed behavior is intended! There is no bug and all is happening according to the documentation. But let it be said, that this behavior should be documented and communicated better. It should be made more obvious how forEach ignores ordering.

I'll first cover the concepts which allow the observed behavior. This provides the background for dissecting one of the examples given in the question. I will do this on a high level and then again on a very low level.

[TL;DR: Read on its own, the high level explanation will give a rough answer.]

Concept

Instead of talking about Streams, which is the type operated on or returned by stream-related methods, let's talk about stream operations and stream pipelines. The method calls lines, skip and parallel are stream operations which build a stream pipeline[1] and - as others have noted - that pipeline is processed as a whole when the terminal operation forEach is called[2].

A pipeline could be thought of as a series of operations which, one after another, are executed on the whole stream (e.g. filter all elements, map remaining elements to numbers, sum all numbers). But this is misleading! A better metaphor is that the terminal operation pulls single elements through each operation[3] (e.g. get the next unfiltered element, map it, add it to sum, request next element). Some intermediate operations may need to traverse several (e.g. skip) or maybe even all (e.g. sort) elements before they can return the requested next element and this is one of the sources for state in an operation.

Each operation signals its characteristics with these StreamOpFlags:

  • DISTINCT
  • SORTED
  • ORDERED
  • SIZED
  • SHORT_CIRCUIT

They are combined across the stream source, the intermediate operations and the terminal operation and make up the characteristics of the pipeline (as a whole), which are then used for optimizations[4]. Similarly, whether a pipeline is executed in parallel or not is a property of the entire pipeline[5].

So whenever you are making assumptions regarding these characteristics, you have to look carefully at all operations building the pipeline, regardless of the order in which they are applied, and what guarantees they make. When doing so keep in mind how the terminal operation pulls each individual element through the pipeline.

Example

Let's look at this special case:

BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar"));
fooBarReader.lines()
        .skip(1L)
        .parallel()
        .forEach(System.out::println);

High Level

Regardless of whether your stream source is ordered or not (it is), by calling forEach (instead of forEachOrdered) you declare that order doesn't matter to you[6], which effectively reduces skip from "skip the first n elements" to "skip any n elements"[7] (because without order the former becomes meaningless).

So you give the pipeline the right to ignore order if that promises a speedup. For parallel execution it apparently thinks so, which is why you get the observed output. Hence what you observe is the intended behavior and no bug.

Note that this does not conflict with skip being stateful! As described above, being stateful does not imply that it somehow caches the whole stream (minus the skipped elements) and everything that follows is executed on these elements. It just means that the operation has some state - namely the number of skipped elements (well, it's not actually that easy but with my limited understanding of what's going on, I'd say it's a fair simplification).

Low Level

Let's look at it in more detail:

  1. BufferedReader.lines creates the Stream, lets call it _lines:
  2. .skip creates a new Stream, let's call it _skip:
    • calls ReferencePipeline.skip
    • which constructs a "slice" operation (generalization of skip & limit) with SliceOps.makeRef
    • this creates an anonymous instance of ReferencePipeline.StatefulOp, which references _lines as its source
  3. .parallel sets the parallel flag for the entire pipeline as described above
  4. .forEach actually starts the execution

So let's see how the pipeline is executed:

  1. Calling _skip.forEach creates a ForEachOp (let's call it _forEach) and hands it to _skip.evaluate, which does two things:
    1. calls sourceSpliterator to create a spliterator around the source for this pipeline stage:
    2. calls _forEach.evaluateParallel which creates a ForEachTask (because it is unordered; let's call it _forEachTask) and invokes it
  2. In _forEachTask.compute the task splits off the first 1024 lines, creates a new task for it (let's call it _forEachTask2), realizes there are no lines left and finishes.
  3. Inside the fork join pool, _forEachTask2.compute gets called, vainly tries to split again and finally starts copying its elements into the sink (a stream-aware wrapper around the System.out.println) by calling _skip.copyInto.
  4. This essentially delegates the task to the the specified spliterator. This is _sliceSpliterator which was created above! So _sliceSpliterator.forEachRemaining is responsible for handing the non-skipped elements to the println-sink:
    1. it gets a chunk (in this case all) of the lines into a buffer and counts them
    2. it tries to request as many permits (I assume due to parallelization) via acquirePermits
    3. with two elements in the source and one to be skipped, there is only one permit which it acquires (in general let's say n)
    4. it lets the buffer put the first n elements (so in this case only the first) into the sink

So UnorderedSliceSpliterator.OfRef.forEachRemaining is where the order is finally and truly ignored. I did not compare this to the ordered variant but this are my assumption why it is done this way:

  • under parallelization shoveling the spliterator's elements into the buffer may interleave with other tasks doing the same
  • this will make tracking their order extremely hard
  • doing that or preventing interleaving degrades performance and is pointless if order is irrelevant
  • if the order is lost, there is little else to do but to process the first n permitted elements

Any questions? ;) Sorry for going on for so long. Maybe I should leave out the details and make a blog post of it....

Sources

[1] java.util.stream - Stream operations and pipelines:

Stream operations are divided into intermediate and terminal operations, and are combined to form stream pipelines.

[2] java.util.stream - Stream operations and pipelines:

Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed.

[3] This metaphor represents my understanding of streams. The main source, beside the code, is this quote from java.util.stream - Stream operations and pipelines (highlighting mine):

Processing streams lazily allows for significant efficiencies; in a pipeline such as the filter-map-sum example above, filtering, mapping, and summing can be fused into a single pass on the data, with minimal intermediate state. Laziness also allows avoiding examining all the data when it is not necessary; for operations such as "find the first string longer than 1000 characters", it is only necessary to examine just enough strings to find one that has the desired characteristics without examining all of the strings available from the source.

[4] java.util.stream.StreamOpFlag:

At each stage of the pipeline, a combined stream and operation flags can be calculated [... jadda, jadda, jadda about how flags are combined across source, intermediate and terminal operations ...] to produce the flags output from the pipeline. Those flags can then be used to apply optimizations.

In code you can see this in AbstractPipeline.combinedFlags, which is set during construction (and on a few other occurrences) by combining the flag of the previous and the new operation.

[5] java.util.stream - Parallelism (to which I can not directly link - scroll down a little):

When the terminal operation is initiated, the stream pipeline is executed sequentially or in parallel depending on the orientation of the stream on which it is invoked.

In code you can see this is in AbstractPipeline.sequential, parallel, and isParallel, which set/check a boolean flag on the stream source, making it irrelevant when the setters are called while constructing a stream.

[6] java.util.stream.Stream.forEach:

Performs an action for each element of this stream. [...] The behavior of this operation is explicitly nondeterministic.

Contrast this with java.util.stream.Stream.forEachOrdered:

Performs an action for each element of this stream, in the encounter order of the stream if the stream has a defined encounter order.

[7] This is also not clearly documented but my interpretation of this comment on Stream.skip (heavily shortened by me):

[...] skip() [...] can be quite expensive on ordered parallel pipelines [...] since skip(n) is constrained to skip not just any n elements, but the first n elements in the encounter order. [...] [R]emoving the ordering constraint [...] may result in significant speedups of skip() in parallel pipelines

Community
  • 1
  • 1
Nicolai Parlog
  • 47,972
  • 24
  • 125
  • 255
  • Woosh! That's some great digging... All in all you are researching why `forEach()` basically ignores what the javadoc calls "intermediate stateful operations" (like `.skip()` is); now, not sure whether that is the intented behavior, but in any case, +1 for the reseach and (source code!) link sprinkle – fge Feb 09 '15 at 22:03
  • 1
    No, I don't think `forEach` is ignoring anything it shouldn't. I rewrote some parts to make that clearer. (Sorry for the wall of text; I hope the structure makes it somewhat accessible). – Nicolai Parlog Feb 10 '15 at 13:57
  • 2
    Finally added all the details of what happens inside `forEach`. Now you can see the individual line where the order is finally screwed up. – Nicolai Parlog Feb 10 '15 at 16:48
  • This answer is amazing! One of the best I've seen in SO so far. I agree in that behavior is as expected and covered by the specification, despite one little inconsistency in skip() docs regarding *first* n elements, which doesn't make sense in an unordered traversal of anything. However, I have the feeling that everything would have been much better if skip() hadn't been allowed on parallel streams in the first place. – fps Feb 11 '15 at 12:52
  • @Magnamag my gut feeling is that in an ordered stream, like the ones `Files.lines()` produced, `.skip()` should be aware of the stream being ordered... – fge Feb 27 '15 at 10:58
  • @fge but the stream is parallel as well. Skipping something while processing in parallel doesn't make much sense, no matter if it's ordered or not. – fps Feb 27 '15 at 13:05
  • @Magnamag that's the thing: `.skip()` is supposed to be stateful. As I read it, given the code example I provided, I'd have expected that the parallel characteristic of the stream yield to .skip() first. And this doesn't seem to be the case, for no reason that I can see. – fge Feb 27 '15 at 13:25
  • @fge Agreed, observed behavior is counter-intuitive. But on the other side, it doesn't make sense to `skip()` on parallel streams. `skip()` is stateful, right, but what's the state of a parallel pipeline whose traversal order is not guaranteed to be the same as the encounter order of elements of the stream? Please see this question I asked inspired by your question and the answers: http://stackoverflow.com/questions/28521382/does-a-good-use-case-exist-for-skip-on-parallel-streams The *guy himself* answered, what an honour... – fps Feb 27 '15 at 14:38
  • 1
    **Stateful Skip.** It seems to me that `skip()` being stateful is interpreted as if it meant something like this: the operation sees the ordered stream source, skips the first `n` elements and makes the rest available to whatever follows. **This is not so.** It just means that skip has _some_ state - in this case it's more or less just the number of skipped elements, nothing more. It is still just some operation in the stream and is suspect to the `ORDERED` flag as any other operation. – Nicolai Parlog Feb 27 '15 at 20:11
  • 3
    **Skip and Order.** `skip()` _will_ regard encounter order _if_ you create an ordered pipeline _but_ with `forEach` you create an unordered one. **This affects the whole pipeline** including `skip()`, which is then allowed to ignore order. This is true regardless of whether the pipeline is processed in parallel or not (it's an implementation detail that the behavior differs between sequential and parallel execution). This behavior is intended and the documentation says so - just not very clearly. – Nicolai Parlog Feb 27 '15 at 20:16
  • `vainly tries to split again `---this is what I'm interested in, even if it's not intimately involved in this topic. My understanding was that it *won't* try to split again because the logic is driven by the *target split size*, which is undefined for unsized sources. So it would be calculated as `(Long.MAX_VALUE/levelOfParallelism) >> 2`, still a huge number. – Marko Topolnik Mar 01 '15 at 17:47
  • @NicolaiParlog Agreed: "it's an implementation detail that the behavior differs between sequential and parallel execution". You're right. My point is that besides that, it makes little sense to skip something when it's being processed in parallel. – fps Mar 01 '15 at 21:28
6

The problem is that you are using parallel stream together with forEach and you are expecting that skip action relies on the correct elements order which is not the case here. Excerpt from the forEach documentation:

For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism.

I guess basically what happens is that skip operation is firstly performed on the second line, not on the first. If you make stream sequential or use forEachOrdered you can see that then it produces the expected result. Another approach would be to use Collectors.

Linas
  • 5,485
  • 1
  • 25
  • 35
  • I was writing just that :-) removing parallel does fix the issue (and parallelising the I/O operation probably does not do do much performance wise anyway). – assylias Feb 01 '15 at 09:33
  • 3
    @assylias: removing `parallel` might fix the problem but only using `forEachOrdered` *guarantees* to do what’s intended. – Holger Feb 01 '15 at 10:26
  • 1
    Hmwell, I'll try, but on the other hand, a `Stream` produced by `Files.lines()` is pretty much ordered by definition, no? – fge Feb 01 '15 at 11:24
  • 5
    It doesn't matter that `forEach` ignores the order because `skip` [shouldn't](http://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#skip-long-), it's a stateful operation. And as @fge noted `Files.lines()` produces an ordered stream. Make it `.skip(1).limit(...)` and you no longer get the first line. According to your explanation you still would. – a better oliver Feb 01 '15 at 15:27
  • @Holger The guarantee to get just the second line is definitely there. Check my answer for the relevant specification quote. – Marko Topolnik Feb 01 '15 at 16:57
  • There are some misconceptions here. `skip` does not rely on "correct order". It will still skip elements but in an explicitly unordered stream pipeline (as we have here) "skipping the first _n_ elements" becomes "skipping any _n_ elements". Also, the file is not read in parallel. And this is no race condition (as your answer seems to suggest) but by design: tracking the order of elements under parallel traversal is hard. So it is simply not done in an unordered pipeline. Hence skipping is implemented easiest by only processing the first elements. – Nicolai Parlog Feb 10 '15 at 16:56
  • @NicolaiParlog I agree with you. I've edited my answer. – Linas Feb 10 '15 at 22:49
4

Let me quote something of relevance—the Javadoc of skip:

While skip() is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines, especially for large values of n, since skip(n) is constrained to skip not just any n elements, but the first n elements in the encounter order.

Now, it is quite certain that Files.lines() has a well-defined encounter order and is an ORDERED stream (if it were not, there would be no guarantee even in sequential operation that the encounter order matches file order), therefore it is guaranteed that the resulting stream will deterministically consist of just the second line in your example.

Whether or not there is something else to this, the guarantee is definitely there.

Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436
  • 2
    The question is whether a stream on which `forEach` (instead of `forEachOrdered`) has been invoked, forms an “ordered parallel pipeline” (note the wording “pipeline” rather than “stream”). But I agree that this isn’t a sufficient specification if it’s really intended to define such counter-intuitive behavior. Btw., it seems to work in both directions, e.g. calling `.unordered().forEachOrdered(…)` seems to impose an *ordered* behavior. – Holger Feb 02 '15 at 09:56
  • 1
    From the semantical side, `forEach` could only allow arbitrary encounter order of the well-defined contents of the stream. It should not be allowed to transform an ordered stream into an unordered one. If the implementation does not respect that, then I would consider it broken. – Marko Topolnik Feb 02 '15 at 10:13
  • 2
    I’m not sure about it. After all, there is `forEachOrdered`, which maintains the ordering while an operation like “maintain ordering for skipping but not for the elements not skipped” would make no sense, not even for parallel performance. I guess, it would have been much better, if the methods were named `forEach` and `forEachUnordered` instead of `forEachOrdered` and `forEach` as the latter of the two methods is the one you should think twice before using. But I’m really looking forward to read the final words of the API designers, once they show up… – Holger Feb 02 '15 at 10:22
  • 1
    I agree that it doesn't make much sense to offer the semantics of ordered skipping with unordered traversal; however the API does not appear to offer precisely that. `forEach` should be the one which doesn't change anything about the stream's definition (you may or may not get ordering); `forEachOrdered` would be allowed to force the pipeline into the ordered configuration. As it appears, `forEach` is actually forcing the pipeline into unordered configuration and I find that unacceptable. – Marko Topolnik Feb 02 '15 at 10:32
  • 2
    Note that what you expect for `forEach` (“`forEach` should be the one which doesn't change anything about the stream's definition (you may or may not get ordering)” is exactly what has been specified for [`forEachOrdered`](http://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#forEachOrdered-java.util.function.Consumer-): “Performs an action for each element of this stream, in the encounter order of the stream if the stream has a defined encounter order.” I’m afraid, the best we’ll get is a “you’re right, it should be the other way round, but now we have to remain compatibility” – Holger Feb 03 '15 at 18:35
  • 1
    My comment was imprecise in the part about `forEach`... `forEachOrdered` cannot invent ordering where there wasn't one to begin with, and that's what the specification must allow for---but it does force the pipeline into respecting encounter order if present. That much is aligned with my earlier comment. I was wrong that "`forEach` is forcing the pipeline into unordered configuration"---what it actually does is *not* force it into ordered configuration, even when the stream is ordered. – Marko Topolnik Feb 03 '15 at 19:02
  • 4
    However, the user-visible effect is that `forEach` does *something* to the stream (change its semantics) whereas `forEachOrdered` "leaves it alone". That's basically what you said. So we have a conflict between the FJ internals, where `forEach` is the "less intrusive" operation, and the user's perspective, where `forEachOrdered` is less intrusive. Obviously, the API should have followed the user's perspective. – Marko Topolnik Feb 03 '15 at 19:04
3

I have an idea how to work-around this problem, which I can't see in the previous discussions. You can recreate the stream splitting the pipeline into two pipelines while keeping the whole thing lazy.

public static <T> Stream<T> recreate(Stream<T> stream) {
    return StreamSupport.stream(stream.spliterator(), stream.isParallel())
                        .onClose(stream::close);
}

public static void main(String[] args) {
    recreate(new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5")).lines()
        .skip(1).parallel()).forEach(System.out::println);
}

When you recreate the stream from initial stream spliterator, then you effectively create a new pipeline. In most of the cases recreate will work as no-op, but the thing is that first and second pipelines don't share the parallel and unordered states. So even if you are using the forEach (or any other unordered terminal operation), only the second stream becomes unordered.

Internally pretty similar thing is concatenating your stream with an empty stream:

Stream.concat(Stream.empty(), 
    new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5"))
          .lines().skip(1).parallel()).forEach(System.out::println);

Though it has a little more overhead.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • Tagir, I've tested your work-around, and it certainly works. However, while I find it amazing, I don't understand why it actually works. I mean, what's the mechanism that makes it work? Why creating another pipeline with the original stream's spliterator and with the same parallel status as the original stream makes the second pipeline actually skip the very first *sequential* element of the first stream? – fps Sep 16 '15 at 22:11
  • 4
    @FedericoPeraltaSchaffner, because the [bug](https://bugs.openjdk.java.net/browse/JDK-8129120) cause was incorrect back-propagation when stream flags like "unordered" incorrectly back-propagated to the previous operations. However back-propagation doesn't work through stream recreation. So I simply focibly stopped the back propagation by this trick. Note also Holger's answer: it's fixed in 8u60. – Tagir Valeev Sep 17 '15 at 01:39