68

Talking about Streams, when I execute this piece of code

public class Main {
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

I get this output

A1B1C1
A2B2C2
A3B3C3

because limiting my stream to the first three components forces actions A, B and C to be executed only three times.

Trying to perform an analogous computation on the last three elements by using skip() method, shows a different behaviour: this

public class Main {
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .skip(6)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

outputs this

A1
A2
A3
A4
A5
A6
A7B7C7
A8B8C8
A9B9C9

Why, in this case, actions A1 to A6 are being executed? It must have something to do with the fact that limit is a short-circuiting stateful intermediate operation, while skip is not, but I don't understand practical implications of this property. Is it just that "every action before skip is executed while not everyone before limit is"?

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
Luigi Cortese
  • 10,841
  • 6
  • 37
  • 48
  • 1
    It's more about the lazy operation of the streams - items are not produced until they are required. – RealSkeptic Sep 05 '15 at 14:23
  • @RealSkeptic uhm.. if you want to argue a bit more, I'll be happy to learn =) – Luigi Cortese Sep 05 '15 at 14:40
  • 1
    Well, `skip()` is a stateful intermediate operation. But, according to [the Javadoc](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html), short-circuiting operations are ones that may process an infinite stream in a finite amount of time. This sort of makes sense, as `skip()` may have to create a different infinite stream, but `limit()` just has to take the first few. I don't know why this difference (if that's even it) would affect your output, though. – bcsb1001 Sep 05 '15 at 14:49

5 Answers5

105

What you have here are two stream pipelines.

These stream pipelines each consist of a source, several intermediate operations, and a terminal operation.

But the intermediate operations are lazy. This means that nothing happens unless a downstream operation requires an item. When it does, then the intermediate operation does all it needs to produce the required item, and then again waits until another item is requested, and so on.

The terminal operations are usually "eager". That is, they ask for all the items in the stream that are needed for them to complete.

So you should really think of the pipeline as the forEach asking the stream behind it for the next item, and that stream asks the stream behind it, and so on, all the way to the source.

With that in mind, let's see what we have with your first pipeline:

Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));

So, the forEach is asking for the first item. That means the "B" peek needs an item, and asks the limit output stream for it, which means limit will need to ask the "A" peek, which goes to the source. An item is given, and goes all the way up to the forEach, and you get your first line:

A1B1C1

The forEach asks for another item, then another. And each time, the request is propagated up the stream, and performed. But when forEach asks for the fourth item, when the request gets to the limit, it knows that it has already given all the items it is allowed to give.

Thus, it is not asking the "A" peek for another item. It immediately indicates that its items are exhausted, and thus, no more actions are performed and forEach terminates.

What happens in the second pipeline?

    Stream.of(1,2,3,4,5,6,7,8,9)
    .peek(x->System.out.print("\nA"+x))
    .skip(6)
    .peek(x->System.out.print("B"+x))
    .forEach(x->System.out.print("C"+x));

Again, forEach is asking for the first item. This is propagated back. But when it gets to the skip, it knows it has to ask for 6 items from its upstream before it can pass one downstream. So it makes a request upstream from the "A" peek, consumes it without passing it downstream, makes another request, and so on. So the "A" peek gets 6 requests for an item and produces 6 prints, but these items are not passed down.

A1
A2
A3
A4
A5
A6

On the 7th request made by skip, the item is passed down to the "B" peek and from it to the forEach, so the full print is done:

A7B7C7

Then it's just like before. The skip will now, whenever it gets a request, ask for an item upstream and pass it downstream, as it "knows" it has already done its skipping job. So the rest of the prints are going through the entire pipe, until the source is exhausted.

RealSkeptic
  • 33,993
  • 7
  • 53
  • 79
  • I like the picture of requests going back and forward, I think you got the point here. I'll do some tests on that... – Luigi Cortese Sep 05 '15 at 15:07
  • 9
    One important point is that stream stages don’t have random access, thus `skip` can’t really skip but has to iterate over the requested number of elements, dropping them. In theory, it could attempt to split the source, esp. when the source is SIZED and it can be predicted that more than the half ought to be skipped. But there is no such optimization in the current implementation. – Holger Sep 07 '15 at 08:57
  • Very nice explanation. – Udayaditya Barua May 18 '18 at 12:13
  • *when the request gets to the `limit`, it knows that it has already given all the items it is allowed to give.* Apparently it knows even earlier, because it doesn't call `peek` for the 7th element. In theory, `skip` should be able to similarly pass over results at the source, as long as there are no intervening `filter`s or `flatMap`s to change the downstream cursor. – shmosel Mar 02 '21 at 23:00
13

The fluent notation of the streamed pipeline is what's causing this confusion. Think about it this way:

limit(3)

All the pipelined operations are evaluated lazily, except forEach(), which is a terminal operation, which triggers "execution of the pipeline".

When the pipeline is executed, intermediary stream definitions will not make any assumptions about what happens "before" or "after". All they're doing is take an input stream and transform it into an output stream:

Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9);
Stream<Integer> s2 = s1.peek(x->System.out.print("\nA"+x));
Stream<Integer> s3 = s2.limit(3);
Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x));

s4.forEach(x->System.out.print("C"+x));
  • s1 contains 9 different Integer values.
  • s2 peeks at all values that pass it and prints them.
  • s3 passes the first 3 values to s4 and aborts the pipeline after the third value. No further values are produced by s3. This doesn't mean that no more values are in the pipeline. s2 would still produce (and print) more values, but no one requests those values and thus execution stops.
  • s4 again peeks at all values that pass it and prints them.
  • forEach consumes and prints whatever s4 passes to it.

Think about it this way. The whole stream is completely lazy. Only the terminal operation actively pulls new values from the pipeline. After it has pulled 3 values from s4 <- s3 <- s2 <- s1, s3 will no longer produce new values and it will no longer pull any values from s2 <- s1. While s1 -> s2 would still be able to produce 4-9, those values are just never pulled from the pipeline, and thus never printed by s2.

skip(6)

With skip() the same thing happens:

Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9);
Stream<Integer> s2 = s1.peek(x->System.out.print("\nA"+x));
Stream<Integer> s3 = s2.skip(6);
Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x));

s4.forEach(x->System.out.print("C"+x));
  • s1 contains 9 different Integer values.
  • s2 peeks at all values that pass it and prints them.
  • s3 consumes the first 6 values, "skipping them", which means the first 6 values aren't passed to s4, only the subsequent values are.
  • s4 again peeks at all values that pass it and prints them.
  • forEach consumes and prints whatever s4 passes to it.

The important thing here is that s2 is not aware of the remaining pipeline skipping any values. s2 peeks at all values independently of what happens afterwards.

Another example:

Consider this pipeline, which is listed in this blog post

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .distinct()
         .limit(10)
         .forEach(System.out::println);

When you execute the above, the program will never halt. Why? Because:

IntStream i1 = IntStream.iterate(0, i -> ( i + 1 ) % 2);
IntStream i2 = i1.distinct();
IntStream i3 = i2.limit(10);

i3.forEach(System.out::println);

Which means:

  • i1 generates an infinite amount of alternating values: 0, 1, 0, 1, 0, 1, ...
  • i2 consumes all values that have been encountered before, passing on only "new" values, i.e. there are a total of 2 values coming out of i2.
  • i3 passes on 10 values, then stops.

This algorithm will never stop, because i3 waits for i2 to produce 8 more values after 0 and 1, but those values never appear, while i1 never stops feeding values to i2.

It doesn't matter that at some point in the pipeline, more than 10 values had been produced. All that matters is that i3 has never seen those 10 values.

To answer your question:

Is it just that "every action before skip is executed while not everyone before limit is"?

Nope. All operations before either skip() or limit() are executed. In both of your executions, you get A1 - A3. But limit() may short-circuit the pipeline, aborting value consumption once the event of interest (the limit is reached) has occurred.

Lukas Eder
  • 211,314
  • 129
  • 689
  • 1,509
  • how would you explain the first example of ```limit``` according to your explanation of the second example? – Amm Sokun Sep 05 '15 at 14:56
  • exactly @AmmSokun, that's my doubt. If "s2 is not aware of the remaining pipeline skipping any values" whay would s2 be aware of the remaining pipeline limiting any values, instead? Edit: reading YOUR edit now... =) – Luigi Cortese Sep 05 '15 at 14:58
  • 1
    @LuigiCortese: I've clarified some parts in bold. `limit()` will "short-circuit the stream", meaning that the terminal operation can consume no more values, and thus, execution stops. Just like `findFirst()` – Lukas Eder Sep 05 '15 at 15:06
  • @LukasEder What is still confusing me is that you say, in **limit(3)** example, that "s2 peeks at all values that pass it and *prints them*". But that's not really happening, having no *A4, A5 ... A9* in the output – Luigi Cortese Sep 05 '15 at 15:16
  • 2
    @LuigiCortese, those values no longer pass `s2`, because the pipeline is "short-circuited" by `limit(3)`, once 3 values have been passed. The terminal operation `forEach()` stops consuming further values from `s4`, which consumes them from `s3`, which consumes them from `s2`, which consumes them from `s1`. Thus, the values 4-9 are never consumed. – Lukas Eder Sep 05 '15 at 15:19
  • Ok, so when you say "*s1 contains 9 different Integer values --> s2 peeks at all values that pass it and prints them*" you mean that s2 WILL print all the values that the terminal operation will ask for (1, 2 and 3 in this case), and not that "*s2 is going to print all the 9 values that come from s1*", am I right? – Luigi Cortese Sep 05 '15 at 15:27
  • 1
    That's what I mean. Otherwise, `limit()` would be useless, if the pipeline before it still consumes the entire possibly infinite stream – Lukas Eder Sep 05 '15 at 15:30
  • Perfect! If you had focused more on this aspect of "requesting values" backward, from the terminal to the first operation it would have been clear to me since the beginning, that's exactly what i was missing. Thanks for the patience! – Luigi Cortese Sep 05 '15 at 15:35
  • 1
    Yep I realised that, which is why I added that explanation to the answer too, for future visitors – Lukas Eder Sep 05 '15 at 15:36
  • @LukasEder it is totally wrong to say each stream operation is executed independently – Amm Sokun Sep 05 '15 at 15:56
  • 2
    @AmmSokun well, I think it's a matter of definition: what does *independently* mean in this context? – Luigi Cortese Sep 05 '15 at 16:32
  • 1
    @AmmSokun: The "total" quantifier is "mostly" wrong, too. But I see your point and I'll review my answer accordingly – Lukas Eder Sep 06 '15 at 09:55
9

It is complete blasphemy to look at steam operations individually because that is not how a stream is evaluated.

Talking about limit(3), it is a short circuit operation, which makes sense because thinking about it, whatever operation is before and after the limit, having a limit in a stream would stop iteration after getting n elements till the limit operation, but this doesn't mean that only n stream elements would be processed. Take this different stream operation for an example

public class App 
{
    public static void main(String[] args) {
        Stream.of(1,2,3,4,5,6,7,8,9)
        .peek(x->System.out.print("\nA"+x))
        .filter(x -> x%2==0)
        .limit(3)
        .peek(x->System.out.print("B"+x))
        .forEach(x->System.out.print("C"+x));
    }
}

would output

A1
A2B2C2
A3
A4B4C4
A5
A6B6C6

which seem right, because limit is waiting for 3 stream elements to pass through the operation chain, although 6 elements of stream are processed.

Amm Sokun
  • 1,298
  • 4
  • 20
  • 35
  • @RealSkeptic 's answer removed any doubts. Now I understand your exaxmple as well: an indefinite number of elements may be processed, until a maximum of 3 reach the terminal operation. And I agree with you, looking at individual stream operations does not help at all. – Luigi Cortese Sep 05 '15 at 16:12
5

All streams are based on spliterators, which have basically two operations: advance (move forward one element, similar to iterator) and split (divide oneself in arbitrary position, which is suitable for parallel processing). You can stop taking input elements at any moment you like (which is done by limit), but you cannot just jump to the arbitrary position (there's no such operation in Spliterator interface). Thus skip operation need to actually read the first elements from the source just to ignore them. Note that in some cases you can perform actual jump:

List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9);

list.stream().skip(3)... // will read 1,2,3, but ignore them
list.subList(3, list.size()).stream()... // will actually jump over the first three elements
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
0

Maybe this little diagram helps to get some natural "feeling" for how the stream is processed.

The first line =>8=>=7=...=== depicts the stream. The elements 1..8 are flowing from the left to the right. There are three "windows":

  1. In the first window (peek A) you see everything
  2. In the second window (skip 6 or limit 3) a kind of filtering is done. Either the first or the last elements are "eliminated" - means not passed on for further processing.
  3. In the third window you see only those items that were passed on

┌────────────────────────────────────────────────────────────────────────────┐ │ │ │▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸ │ │ 8 7 6 5 4 3 2 1 │ │▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸ │ │ │ │ │ │ │ │ skip 6 │ │ │ peek A limit 3 peek B │ └────────────────────────────────────────────────────────────────────────────┘

Probably not everything (maybe not even anything) in this explanation is technically completely correct. But when I see it like this it's quite clear to me what items reach which of the concatenated instructions.

yaccob
  • 1,230
  • 13
  • 16