2

Note: I am not necessarily looking for solutions to the concrete example problems described below. I am genuinely interested why this isn't possible out of the box in java 8.


Java streams are lazy. At the very end they have a single terminal operation.
My interpretation is that this terminal operation will pull all the values through the stream. None of the intermediate operations can do that. Why are there no intermediate operations that pull in an arbitrary amount of elements through the stream? Something like this:

stream
    .mapMultiple(this::consumeMultipleElements) // or groupAndMap or combine or intermediateCollect or reverseFlatMap
    .collect(Collectors.toList());

When a downstream operation tries to advance the stream once, the intermediate operation might try to advance the upstream multiple times (or not at all).

I would see a couple of use cases:
(These are just examples. So you can see that it is certainly possible to handle these use cases but it is "not the streaming way" and these solutions lack the desirable laziness property that Streams have.)

  • Combine multiple elements into a single new element to be passed down the rest of the stream. (E.g., making pairs (1,2,3,4,5,6) ➔ ((1,2),(3,4),(5,6)))

    // Something like this,
    // without needing to consume the entire stream upfront,
    // and also more generic. (The combiner should decide for itself how many elements to consume/combine per resulting element. Maybe the combiner is a Consumer<Iterator<E>> or a Consumer<Supplier<E>>)
    public <E, R> Stream<R> combine(Stream<E> stream, BiFunction<E, E, R> combiner) {
        List<E> completeList = stream.collect(toList());
        return IntStream.range(0, completeList.size() / 2)
            .mapToObj(i -> combiner.apply(
                    completeList.get(2 * i),
                    completeList.get(2 * i + 1)));
    }
    
  • Determine if the Stream is empty (mapping the Stream to an Optional non-empty Stream)

    // Something like this, without needing to consume the entire stream
    public <E> Optional<Stream<E>> toNonEmptyStream(Stream<E> stream) {
        List<E> elements = stream.collect(toList());
        return elements.isEmpty()
            ? Optional.empty()
            : Optional.of(elements.stream());
    }
    
  • Having a lazy Iterator that doesn't terminate the stream (allowing to skip elements based on more complicated logic then just skip(long n)).

    Iterator<E> iterator = stream.iterator();
    // Allow this without throwing a "java.lang.IllegalStateException: stream has already been operated upon or closed"
    stream.collect(toList());
    

When they designed Streams and everything around them, did they forget about these use cases or did they explicitly leave this out?
I understand that these might give unexpected results when dealing with parallel streams but in my opinion this is a risk that can be documented.

neXus
  • 2,005
  • 3
  • 29
  • 53
  • Perhaps the name of the intermediate operation confuses you. I changed it to be more clear. The `consumeMultipleElements` would fetch some arbitrary amount of elements from the upstream and combine them into a single element which would be passed downstream. In that sense it is like some sort of reverse flatMap. The terminal collect operation would collect all the results from this intermediate mapping. – neXus Nov 30 '17 at 14:01
  • Well, in this case, you have two questions in one. There is no operation allowing this kind of “reverse flatMap”, not as intermediate operation nor as terminal operation. So we should first have this kind of operation as terminal operation before we start asking “why not as intermediate?”. The second example is not very convincing. The empty stream doesn’t do anything anyway, regardless of what you chain, so what do you gain from the `Optional`? The third… is already possible, as you can create a new `Stream` from the `Iterator` accepting all the performance disadvantages such an approach has. – Holger Nov 30 '17 at 16:09
  • As for **example 1**: I think that maybe `Collectors.collectingAndThen` might be simplified this way syntactically, and `Collectors.groupingBy()` seems related. As for **example 2**: You could use `Optional.orElseGet(Supplier)` to provide another source of data to do the rest of the processing on. This case was actually one of the things that got me thinking in the first place and eventually lead to me asking this broader question. As for **example 3**: that is good to know. I facepalmed myself for not realising that solution before. Thanks! – neXus Nov 30 '17 at 16:59
  • 2
    You're also forgetting parallelism. The Stream library was designed to allow any pipeline to operate either sequentially or in parallel. You don't know which way you're going to go until you reach the terminal operation. Think of the source + intermediate operations defining the stream contents, and the terminal operation defining what you want to do to those contents, and how (short-circuit or not, parallel or not, etc.) – Brian Goetz Nov 30 '17 at 19:23

2 Answers2

5

Well all of the operations that you want are actually achievable in the Stream API, but not out of the box.

Combining multiple elements into pairs of elements - you need a custom Spliterator for that. Here is Tagir Valeev doing that. He has a absolute beast of the library called StreamEx that does many other useful things that are not supported out of the box.

I did not understand your second example, but I bet it's doable also.

skip to a more complicated operation are in java-9 via dropWhile and takeWhile that take a Predicate as input.

Just notice that when you say that none of the intermediate operations can do that is not accurate - there is sorted and distinct that do exactly that. They can't work otherwise. There's also flatMap that acts like that, but that is treated more like a bug.

One more thing is that intermediate operations for parallel streams have no defined order, so such a stateful intermediate operation would have unknown entries for a parallel stream. On the other hand you always have the option to abuse things like:

List<Something> list = Collections.synchronizedList()
.map(x -> {
     list.add(x);
     // your mapping
 })

I would not do that if I were you and really think if I might need that, but just in case...

Eugene
  • 117,005
  • 15
  • 201
  • 306
  • 4
    I wouldn’t refer to `flatMap` in this context, it’s more confusing than clarifying. The answer works well without it. – Holger Nov 30 '17 at 13:29
  • Incredible! You've beaten @Holger on this one... Let's see if time keeps it this way... – fps Dec 01 '17 at 22:59
  • 1
    @FedericoPeraltaSchaffner I did not, neither I think this would be possible at all. He is kind of still the best Ive seen around here – Eugene Dec 01 '17 at 23:02
  • 1
    @FedericoPeraltaSchaffner: I’ve added an answer after the question had been substantially edited and another clarifying comment regarding the use case of one of the three examples in the question had been added. So my answer mostly addresses these additionally information rather than the points already addressed by this answer and wouldn’t work as a stand-alone answer. I’m not even sure whether both answers together will satisfy the OP… – Holger Dec 04 '17 at 09:36
  • 1
    @Holger I know... Your answer is complementary to Eugene's. I have upvoted both, because I consider they are both useful. The question is tough, though, and I don't think it can be easily answered here. It is worth a whole series of blog posts... – fps Dec 04 '17 at 12:56
  • Indeed, @Holger and @Eugene gave complementary answers. I'm not sure which of the two I should accept as "the" answer because I really learned a lot from both of them. What I should remember is that basically everything can be implemented when using the `.spliterator()`. Too bad it cannot be done in a fluent way by default, but that [StreamEx](https://github.com/amaembo/streamex) library is very nice and even shows how to do add this functionality yourself by wrapping the `Stream`. – neXus Dec 07 '17 at 16:12
3

Not every terminal operation will “pull all the values through the stream”. The terminal operations iterator() and spliterator() do not immediately fetch all values and allow to do lazy processing, including constructing a new Stream again. For the latter, it’s strongly recommended to use spliterator() as this allows more meta information to be passed to the new stream and also implies less wrapping of objects.

E.g. your second example could be implemented as

public static <T> Stream<T> replaceWhenEmpty(Stream<T> s, Supplier<Stream<T>> fallBack) {
    boolean parallel = s.isParallel();
    Spliterator<T> sp = s.spliterator();
    Stream.Builder<T> firstElement;
    if(sp.getExactSizeIfKnown()==0 || !sp.tryAdvance(firstElement=Stream.builder())) {
        s.close();
        return fallBack.get();
    }
    return Stream.concat(firstElement.build(), StreamSupport.stream(sp, parallel))
                 .onClose(s::close);
}

For your general question, I don’t see how a general abstraction of these examples should look like, except like the spliterator() method that already exist. As the documentation puts it

However, if the provided stream operations do not offer the desired functionality, the BaseStream.iterator() and BaseStream.spliterator() operations can be used to perform a controlled traversal.

Holger
  • 285,553
  • 42
  • 434
  • 765