9

How can I check if a stream instance has been consumed or not (meaning having called a terminal operation such that any further call to a terminal operation may fail with IllegalStateException: stream has already been operated upon or closed.?

Ideally I want a method that does not consume the stream if it has not yet been consumed, and that returns a boolean false if the stream has been consumed without catching an IllegalStateException from a stream method (because using Exceptions for control flow is expensive and error prone, in particular when using standard Exceptions).

A method similar to hasNext() in Iterator in the exception throwing and boolean return behavior (though without the contract to next()).

Example:

public void consume(java.util.function.Consumer<Stream<?>> consumer, Stream<?> stream) {
   consumer.accept(stream);
   // defensive programming, check state
   if (...) {
       throw new IllegalStateException("consumer must call terminal operation on stream");
   }
}

The goal is to fail early if client code calls this method without consuming the stream.

It seems there is no method to do that and I'd have to add a try-catch block calling any terminal operation like iterator(), catch an exception and throw a new one.

An acceptable answer can also be "No solution exists" with a good justification of why the specification could not add such a method (if a good justification exists). It seems that the JDK streams usually have this snippets at the start of their terminal methods:

// in AbstractPipeline.java
if (linkedOrConsumed)
    throw new IllegalStateException(MSG_STREAM_LINKED);

So for those streams, an implementation of such a method would not seem that difficult.

tkruse
  • 10,222
  • 7
  • 53
  • 80
  • 1
    It seems there is no way, not even the IllegalStateException is guaranteed to be thrown according to the discussion [here](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html). I wonder what `iterator.hasNext()` will do however. – President James K. Polk Jul 08 '19 at 02:06
  • Don't pass around streams, pass stream providers. It is similar to passing an iterator instead of iterable and trying to iterate twice. – LeffeBrune Jul 08 '19 at 02:12
  • Can you specify more details about what is consumer, what stands exactly for "stream was consumed" and what you would like to achieve in overall. Looks like this is not clear from what you've already written. – Boris Jul 08 '19 at 02:17
  • @LeffeBrune: I pass around stream consumers, not streams. – tkruse Jul 08 '19 at 02:28
  • @Boris, I added more detail. But the question is generic, it applies to different situations when a stream supplier or consumer is passed around, and the program should fail early when they get called but the stream they provide or consume is not actually being used. – tkruse Jul 08 '19 at 02:29
  • 1
    @tkruse Could you state an example to explain what you mean by *"but the stream they provide or consume is not actually being used"*? To what could possibly be inferred, it seems like you are working with `Consumer` and not `Stream` directly and once you call the `accept` method, you want to make sure that did it actually use the stream provided or not. Is that the case and why(when) would the code within such a `Consumer` not really consume the stream? – Naman Jul 08 '19 at 03:17
  • Typically it would be a bug if a consumer did not consume the stream. And yes, it can be inferred on the callers site, but I want to check it in the consume method above, which has no knowledge about what client will do with the stream, other than wanting to enforce that clients to actually call a terminating operation on the stream. But the same method could also be applied if a code part does not know whether a stream has already been closed or consumed to prevent trying to consume it again, without catching all IllegalStateExceptions and filtering by JVM message detail. – tkruse Jul 08 '19 at 04:10
  • To motivate: The similar iterator interface has a hasNext() and a next() method, I can call hasNext() without exception to check whether it has another element. Streams do not have a hasNext() method, but it seems they also have no other method to prevent an exception before calling a terminal method a second time. Or to check whether it has been consumed without a potential exception. – tkruse Jul 08 '19 at 04:16
  • 2
    Why do you not know if the stream has been consumed or not? What use is the stream once it has been consumed? Why would you keep a reference to a stream that has been consumed? I can't yet think of a use case for this problem. – CryptoFool Jul 08 '19 at 05:03
  • A stream is of no use once consumed. But the information of whether it has been consumed or not can be of use. My use-case is just defensive programming, asserting a post-condition. If you would like a functionally useful use-case, you can consider the situation where a collection of stream is created, then some of them get consumed until a condition is met, but some code wants to continue to consume all streams that have not yet been consumed. – tkruse Jul 08 '19 at 05:19
  • But for your use case, why not just remove the stream from the collection once it's been consumed. I understand the abstract idea, but I wonder if you're really making up a problem where one doesn't exist. It seems to me that the answer is to just somehow track that a stream has been consumed, often by simply forgetting its reference since it's no longer useful. - sometimes, there's a reason some feature isn't available, like in this case, knowing if a stream has been consumed. Maybe it's the case that you never need to retain a reference to a consumed stream... – CryptoFool Jul 08 '19 at 05:32
  • ...this seems like a nearly identical case to a file handle. When you're done with a file, you close its handle and forget about it. We've been using files and holding on to file handles for just long enough for years and years. Does it ever come up that you have a file handle that you have to ask if is already closed or not? I can't recall ever needing to know that. I think it's the same case here... – CryptoFool Jul 08 '19 at 05:35
  • ...I'm not saying that there isn't a logical use cases for this. I just can't think of one, and would be really interested in knowing if there is one. – CryptoFool Jul 08 '19 at 05:38
  • @tkruse why not pass around Consumer> so that you can get the stream before consuming it? – LeffeBrune Jul 08 '19 at 08:13
  • @LeffeBrune, in standard JDK, that would be Consumer>, I assume. That's creative, a stream could be locally created and consumed in the call site code. But it's not an answer to this question. – tkruse Jul 08 '19 at 09:20
  • Well ... you could do some reflection hacks there. I just tried this, and it seems to be possible, but doubt that it is worth it.... – Marco13 Jul 08 '19 at 11:10
  • @Marco13 - it is also unwise ... because it ties your application code to a *specific* implementation of the classes that you are introspecting. If the internals of the stream class change in a future version of Java, your application breaks. – Stephen C Aug 04 '19 at 04:18

3 Answers3

4

Taking into consideration that spliterator (for example) is a terminal operation, you can simply create a method like:

private static <T> Optional<Stream<T>> isConsumed(Stream<T> stream) {

    Spliterator<T> spliterator;
    try {
        spliterator = stream.spliterator();
    } catch (IllegalStateException ise) {
        return Optional.empty();
    }

    return Optional.of(StreamSupport.stream(
        () -> spliterator,
        spliterator.characteristics(),
        stream.isParallel()));
}

I don't know of a better way to do it... And usage would be:

Stream<Integer> ints = Stream.of(1, 2, 3, 4)
                                 .filter(x -> x < 3);

YourClass.isConsumed(ints)
         .ifPresent(x -> x.forEachOrdered(System.out::println));

Since I don't think there is a practical reason to return an already consumed Stream, I am returning Optional.empty() instead.

Eugene
  • 117,005
  • 15
  • 201
  • 306
  • Sure, I was aware of this solution and mentioned it in the question. It is not the approach I am looking for because it uses Exceptions for control flow (and creating exceptions is expensive and slow due to the Stacktrace creation). But if no other answer is found in a couple of days, I may accept this answer. – tkruse Jul 08 '19 at 09:25
  • @tkruse You are talking about some nanoseconds of overhead in the context of using `Stream` which is easily 10x slower than a simple for loop - caring about [some ns](https://shipilev.net/blog/2014/exceptional-performance/) here is over the top to say the least – roookeee Jul 15 '19 at 15:15
  • Fair enough. The other concern is that only the illegalstateexception from the stream already been consumed should be caught, other ones may need to be rethrown. – tkruse Jul 15 '19 at 23:28
  • @tkruse as far as I can tell, there would be no other option... I wish someone would come up with another interesting way to do it, but it seems not possible. – Eugene Jul 31 '19 at 20:14
1

One solution could be to add an intermediate operation (e.g. filter()) to the stream before passing it to the consumer. In that operation you do nothing but saving the state, that the operation was called (e.g. with an AtomicBoolean):

public <T> void consume(Consumer<Stream<T>> consumer, Stream<T> stream) {
    AtomicBoolean consumed = new AtomicBoolean(false);
    consumer.accept(stream.filter(i -> {
        consumed.set(true);
        return true;
    }));
    if (!consumed.get()) {
        throw new IllegalStateException("consumer must call terminal operation on stream");
    }
}

Side Note: Do not use peek() for this, because it is not called with short-circuiting terminal operations (like findAny()).

tkruse
  • 10,222
  • 7
  • 53
  • 80
Samuel Philipp
  • 10,631
  • 12
  • 36
  • 56
  • it would have been great if you at least made this a method that would compile... but anyway, this is not correct. even if you made it compile that `filter` will set the `consumed` flag only when there is _at least_ one element in the stream, this would not solve the OP's problem about a stream being `consumed` – Eugene Jul 31 '19 at 20:24
  • 1
    It is a good answer when there is at least one element in the stream, though. – tkruse Aug 04 '19 at 04:13
0

Here is a standalone compilable solution that uses a delegating custom Spliterator<T> implementation + an AtomicBoolean to accomplish what you seek without losing thread-safety or affecting the parallelism of a Stream<T>.

The main entry is the Stream<T> track(Stream<T> input, Consumer<Stream<T>> callback) function - you can do whatever you want in the callback function. I first tinkered with a delegating Stream<T> implementation but it's just too big an interface to delegate without any issues (see my code comment, even Spliterator<T> has its caveats when delegating):

import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

class StackOverflowQuestion56927548Scratch {

    private static class TrackingSpliterator<T> implements Spliterator<T> {
        private final AtomicBoolean tracker;
        private final Spliterator<T> delegate;
        private final Runnable callback;

        public TrackingSpliterator(Stream<T> forStream, Runnable callback) {
            this(new AtomicBoolean(true), forStream.spliterator(), callback);
        }

        private TrackingSpliterator(
                AtomicBoolean tracker,
                Spliterator<T> delegate,
                Runnable callback
        ) {
            this.tracker = tracker;
            this.delegate = delegate;
            this.callback = callback;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean advanced = delegate.tryAdvance(action);
            if(tracker.compareAndSet(true, false)) {
                callback.run();
            }
            return advanced;
        }

        @Override
        public Spliterator<T> trySplit() {
            Spliterator<T> split = this.delegate.trySplit();
            //may return null according to JavaDoc
            if(split == null) {
                return null;
            }
            return new TrackingSpliterator<>(tracker, split, callback);
        }

        @Override
        public long estimateSize() {
            return delegate.estimateSize();
        }

        @Override
        public int characteristics() {
            return delegate.characteristics();
        }
    }

    public static <T> Stream<T> track(Stream<T> input, Consumer<Stream<T>> callback) {
        return StreamSupport.stream(
                new TrackingSpliterator<>(input, () -> callback.accept(input)),
                input.isParallel()
        );
    }

    public static void main(String[] args) {
        //some big stream to show it works correctly when parallelized
        Stream<Integer> stream = IntStream.range(0, 100000000)
                .mapToObj(Integer::valueOf)
                .parallel();
        Stream<Integer> trackedStream = track(stream, s -> System.out.println("consume"));

        //dummy consume
        System.out.println(trackedStream.anyMatch(i -> i.equals(-1)));
    }
}

Just return the stream of the track function, maybe adapt the callback parameters type (you probably don't need to pass the stream) and you are good to go.

Please note that this implementation only tracks when the stream is actually consumed, calling .count() on a Stream that was produced by e.g. IntStream.range(0,1000) (without any filter steps etc.) will not consume the stream but return the underlying known length of the stream via Spliterator<T>.estimateSize()!

roookeee
  • 1,710
  • 13
  • 24
  • I don't think that this is correct. `boolean advanced = delegate.tryAdvance(action);` this does not check if a Stream is consumed, but rather if it's empty. For example: `Stream empty = Stream.empty(); Stream wrapped = track(empty, x -> System.out.println("x = " + x)); wrapped.forEach(System.out::println);` also if this condition happened (though it's not the correct thing as I said above): `if (tracker.compareAndSet(true, false)) {`, you should be really stopping every sub-sequent operation, right? – Eugene Jul 31 '19 at 20:11
  • I mean what should this show? `Stream s = Stream.concat(Stream.empty(), Stream.of(1, 2, 3, 4)) .filter(x -> x > 0); Stream wrapped = track(s, x -> System.out.println("consume")); wrapped.forEachOrdered(System.out::println);` – Eugene Jul 31 '19 at 20:11
  • I dont check for `boolean advanced = delegate.tryAdvance(action);`, I just call the delegate, so I can't quite follow. The question was wether the stream was consumed which is the case once `tryAdvance` was called so it doesn't matter if it was empty or not, it was consumed once `tryAdvance` was called at least once. The example of your first comment correctly prints once. Your second example calls the callback exactly once too (after the first element was consumed). I don't understand your remark of the `tracker.compareAndSet`, I just use it to guarantee the callback is only called once – roookeee Jul 31 '19 at 21:05
  • Regarding your second comment: you only track the outer stream so it naturally only prints once. Once could wrap the inner streams of `Stream.concat` if further tracking is needed – roookeee Jul 31 '19 at 21:07
  • what do you think will happen when you call `spliterator()` on a already consumed stream? That was my point about `tryAdvance` in you code checks for emptyness. – Eugene Aug 01 '19 at 09:39
  • Where do I check emptyness? I just call `tryAdvance` on the delegate but don't check the result at all. Passing an already consumed stream to `track` of course doesn't work, but that's not the use case here (as I see it) – roookeee Aug 01 '19 at 16:41
  • 1
    as _I_ see it, a consumed stream is a stream that has already been consumed by a terminal operation and the OP seems to me to make it very clear in his/her very first statement of the question – Eugene Aug 01 '19 at 16:43
  • I see, you are right. Don't know why I thought he wanted what I have implemented. Will let this answer here in case someone needs something alike in the case one can control the creation of the to-be-tracked streams :) – roookeee Aug 01 '19 at 20:31
  • 1
    you are right about leaving it here IMO, I actually enjoyed understanding the idea behind it. – Eugene Aug 01 '19 at 20:32