2

Even though I am using Supplier for my streams and using Supplier.Get() each time I want to retrieve my strem and perform a terminal operation on it, I am still getting the "stream has already been operated upon or closed" exception. Could anyone please look at my code and suggest what I am doing wrong?

Method where exception is being thrown:

private static void printMyDetails(Supplier<Stream<Result>> mySupplier, String myStatus) {
        checkNotNull(mySupplier);
        checkArgument(isNotEmpty(myStatus), "Invalid My status");

        if (mySupplier.get().noneMatch(result -> true)) { //<-This is where the exception is thrown
            if (log.isInfoEnabled()) {
                log.info("Number of My with status '{}': {}", My, 0);
            }
        } else {
            log.info("Number of My with status '{}': {}", myStatus, mySupplier.get().count());
            log.info("Details of My(s) with status: '{}'", myStatus);
            mySupplier.get().sorted().forEach(Utils::printMyNameAndDetails);
        }
    }

Place which is calling the above method:

rb.keySet().stream().filter(key -> containsIgnoreCase(key, "status")).map(rb::getString)
                .filter(StringUtils::isNotEmpty).forEach(status -> {
            var resultsWithExpectedStatusSupplier = requireNonNull(getResultsWithExpectedStatusSupplier(results, status));
            resultsWithExpectedStatusSupplier.ifPresentOrElse(streamSupplier -> printMyDetails(streamSupplier, status), () -> {
                if (log.isInfoEnabled())
                    log.info("0 My with status: {}", status);
            });
        });

The stream supplier:

private static Optional<Supplier<Stream<Result>>> getResultsWithExpectedStatusSupplier(
            @NotNull List<Result> results, @NotNull String expectedStatus) {
        checkArgument(!results.isEmpty(), "Results list is empty");
        checkArgument(isNotEmpty(expectedStatus), "Invalid expected status");

        var resultStreamWithExpectedStatus = requireNonNull(results.stream().filter(result -> ofNullable(result).map(Result::getStatus)
                .allMatch(s -> isNotEmpty(s) && s.equalsIgnoreCase(expectedStatus))));
        return resultStreamWithExpectedStatus.count() == 0 ? Optional.empty() : Optional.of(() -> resultStreamWithExpectedStatus);
    }
Naman
  • 27,789
  • 26
  • 218
  • 353
ltcolumb
  • 73
  • 1
  • 7
  • Christian Ullenboom already provided the general answer. Your fix will most likely be in `getResultsWithExpectedStatusSupplier`, so you might want to provide the code there too (I guess thats where a `Optional>>` is created with a supplier that always returns the same stream). – sfiss Nov 05 '19 at 11:16
  • Apart from that, it seems strange that you wrap the value inside `Optional` into a stream using `Stream.ofNullable()`, your optional value should not contain `null` anyway. – sfiss Nov 05 '19 at 11:17
  • Thanks, I've also added the getResultsWithExpectedStatusSupplier code to my post now. – ltcolumb Nov 05 '19 at 12:20
  • @sfiss your second point is very valid, I've changed that also now (i.e. not using Stream.ofnullable in Optional) – ltcolumb Nov 05 '19 at 12:27
  • What is the implementation of checkNotNull method? Hope that you are not reading the stream there? – Ouney Nov 05 '19 at 12:41
  • @Ouney The checkNotNull is from Guava's Preconditions library, sorry this wasn't clear, as I'm using static import `import static com.google.common.base.Preconditions.*; ` – ltcolumb Nov 05 '19 at 12:48
  • I have given a more concrete answer to the specific problem instance, the main problem is that you used a streams terminal operation (`count`) and still want to traverse the stream afterwards. A similar problem can be seen here https://stackoverflow.com/questions/38044849/is-possible-to-know-the-size-of-a-stream-without-using-a-terminal-operation/ – sfiss Nov 05 '19 at 12:58
  • Your stream is already consumed when you evaluated, `resultStreamWithExpectedStatus.count() == 0`, so the fact that the supplier doesn't return a new stream each time, is another problem that didn't even have the chance to materialize yet. Though, there is no sense in having all these repeated checks for empty streams in your code anyway. – Holger Nov 05 '19 at 15:02

2 Answers2

2

You can consume a Stream just once. It looks like the Supplier is always giving the same Stream again and again. After the first terminal operation the Stream is drained; the Stream from the Supplier has to be a new Stream all the time.

Christian Ullenboom
  • 1,388
  • 3
  • 24
  • 20
2

The general problem is as Christian Ullenboom said: The stream has already been consumed. The exact location in your code is the call to resultStreamWithExpectedStatus.count() in the method getResultsWithExpectedStatusSupplier, as Stream.count is a reduction/terminal operation which consumes the stream.

As stated in e.g. this answer, you cannot get the streams size without consuming it. Fix it by storing the filtered items in a collection (e.g. Collectors.toList), querying the size there and returning the collection itself rather than the stream?

On a side note, I think you misuse Optional a bit too much. The code could be simpler passing empty streams (or even better: pass an empty, filtered collection).

sfiss
  • 2,119
  • 13
  • 19
  • Thank you, yes this was the precise problem, i'm sorted now. Everyone's suggestions were helpful, especially Chiristian's, but marked this one as the answer as it was more precise and detailed. – ltcolumb Nov 05 '19 at 16:40
  • P.s could you or someone else also please explain a better alternative for the repeated null checks I'm doing via Optional and Required (or even better, is there a static analysis tool which can spot these issues, I'm using SonarLint+FindBugs, but haven't had any warnings or suggestions for this)? Also, is there any best practice on when to return Stream or Collection (i.e. should callee return Collection and let caller turn it to Stream when it needs to perform data operations, or is it better that Callee itself returns the Stream, currently I've done the latter). – ltcolumb Nov 05 '19 at 16:40
  • 1
    You could look [here](https://stackoverflow.com/questions/24676877/should-i-return-a-collection-or-a-stream) on the last question. Personally, I tend to return finite collections on most outward facing methods/APIs and Stream internally, e.g on private methods. Also, I tend not do do too much defensive programming (checking arguments is at most required in `public` methods, since you control the call to your `private` methods). – sfiss Nov 05 '19 at 17:00
  • 1
    In your case, you could probably group your results in a `Map>`, then loop over the `status` and print size plus details. Seems much more straightforward than what you are doing. – sfiss Nov 05 '19 at 17:03