23

There is the question on whether java methods should return Collections or Streams, in which Brian Goetz answers that even for finite sequences, Streams should usually be preferred.

But it seems to me that currently many operations on Streams that come from other places cannot be safely performed, and defensive code guards are not possible because Streams do not reveal if they are infinite or unordered.

If parallel was a problem to the operations I want to perform on a Stream(), I can call isParallel() to check or sequential to make sure computation is in parallel (if i remember to).

But if orderedness or finity(sizedness) was relevant to the safety of my program, I cannot write safeguards.

Assuming I consume a library implementing this fictitious interface:

public interface CoordinateServer {
    public Stream<Integer> coordinates();
    // example implementations:
    // finite, ordered, sequential
    // IntStream.range(0, 100).boxed()
    // final AtomicInteger atomic = new AtomicInteger();
    
    // // infinite, unordered, sequential
    // Stream.generate(() -> atomic2.incrementAndGet()) 

    // infinite, unordered, parallel
    // Stream.generate(() -> atomic2.incrementAndGet()).parallel()
    
    // finite, ordered, sequential, should-be-closed
    // Files.lines(Path.path("coordinates.txt")).map(Integer::parseInt)
}

Then what operations can I safely call on this stream to write a correct algorithm?

It seems if I maybe want to do write the elements to a file as a side-effect, I need to be concerned about the stream being parallel:

// if stream is parallel, which order will be written to file?
coordinates().peek(i -> {writeToFile(i)}).count();
// how should I remember to always add sequential() in  such cases?

And also if it is parallel, based on what Threadpool is it parallel?

If I want to sort the stream (or other non-short-circuit operations), I somehow need to be cautious about it being infinite:

coordinates().sorted().limit(1000).collect(toList()); // will this terminate?
coordinates().allMatch(x -> x > 0); // will this terminate?

I can impose a limit before sorting, but which magic number should that be, if I expect a finite stream of unknown size?

Finally maybe I want to compute in parallel to save time and then collect the result:

// will result list maintain the same order as sequential?
coordinates().map(i -> complexLookup(i)).parallel().collect(toList());

But if the stream is not ordered (in that version of the library), then the result might become mangled due to the parallel processing. But how can I guard against this, other than not using parallel (which defeats the performance purpose)?

Collections are explicit about being finite or infinite, about having an order or not, and they do not carry the processing mode or threadpools with them. Those seem like valuable properties for APIs.

Additionally, Streams may sometimes need to be closed, but most commonly not. If I consume a stream from a method (of from a method parameter), should I generally call close?

Also, streams might already have been consumed, and it would be good to be able to handle that case gracefully, so it would be good to check if the stream has already been consumed;

I would wish for some code snippet that can be used to validate assumptions about a stream before processing it, like>

Stream<X> stream = fooLibrary.getStream();
Stream<X> safeStream = StreamPreconditions(
    stream, 
    /*maxThreshold or elements before IllegalArgumentException*/
    10_000,
    /* fail with IllegalArgumentException if not ordered */
    true
    )

tkruse
  • 10,222
  • 7
  • 53
  • 80
  • 1
    I guess a lot of "it depends". Would wait for someone like Holger to answer this, if not considered as broad. – Naman Jun 10 '19 at 16:05
  • I think you could use stream characteristics for it - check [this](https://stackoverflow.com/questions/44486238/how-to-correctly-find-the-stream-characteristics-in-java-8) question for more details. – Oleksandr Pyrohov Jun 10 '19 at 16:23
  • Thx, I did not know about splititerator characteristics. Still they do not look like something to use in application programming (rather like an implementation detail of Stream). – tkruse Jun 10 '19 at 23:38
  • maybe this answers some of your questions https://www.baeldung.com/java-stream-ordering – JavaMan Jun 24 '19 at 11:04
  • As Brian says himself in the answer you posted, "The one case where you must return a Collection is when there are strong consistency requirements". Requiring it being finite is one of those. – daniu Jul 08 '19 at 05:08
  • @daniu: But he explicitly suggests to use Streams for finite data, which means being finite is not a strong consistency requirement to him. – tkruse Jul 08 '19 at 05:28
  • Unless you assume that the author of a method knows all uses (current and future) of a method and thus knows that to the clients, being finite is not a consistency requirement. – tkruse Jul 08 '19 at 05:29

1 Answers1

2

After looking at things a bit (some experimentation and here) as far as I see, there is no way to know definitely whether a stream is finite or not.

More than that, sometimes even it is not determined except at runtime (such as in java 11 - IntStream.generate(() -> 1).takeWhile(x -> externalCondition(x))).

What you can do is:

  1. You can find out with certainty if it is finite, in a few ways (notice that receiving false on these does not mean it is infinite, only that it may be so):

    1. stream.spliterator().getExactSizeIfKnown() - if this has an known exact size, it is finite, otherwise it will return -1.

    2. stream.spliterator().hasCharacteristics(Spliterator.SIZED) - if it is SIZED will return true.

  2. You can safe-guard yourself, by assuming the worst (depends on your case).

    1. stream.sequential()/stream.parallel() - explicitly set your preferred consumption type.
    2. With potentially infinite stream, assume your worst case on each scenario.

      1. For example assume you want listen to a stream of tweets until you find one by Venkat - it is a potentially infinite operation, but you'd like to wait until such a tweet is found. So in this case, simply go for stream.filter(tweet -> isByVenkat(tweet)).findAny() - it will iterate until such a tweet comes along (or forever).
      2. A different scenario, and probably the more common one, is wanting to do something on all the elements, or only to try a certain amount of time (similar to timeout). For this, I'd recommend always calling stream.limit(x) before calling your operation (collect or allMatch or similar) where x is the amount of tries you're willing to tolerate.

After all this, I'll just mention that I think returning a stream is generally not a good idea, and I'd try to avoid it unless there are large benefits.

tkruse
  • 10,222
  • 7
  • 53
  • 80
orirab
  • 2,915
  • 1
  • 24
  • 48
  • .splititerator() is a method I think, not a public field. Also you can copy the checking of SIZED for ORDERED, I guess? I think it should be possible to have a counter for the elements as the stream is processed, so that even for potentially infinitestreams, it can throw an exception if more elements are emitted than I maximally expected (at performance cost of course). Else nice answer so far. – tkruse Jun 26 '19 at 10:01
  • spliterator - correct. ordered - the problem is that it can be ordered only if it is finite, otherwise it will take forever (for example, `Stream.generate(random::nextInt).sorted()` will cause an intellij warning), so checking for ordered is a bit redundant. Instead of keeping a counter and inc. it yourself, why not use `limit(x)` as maximum? – orirab Jun 26 '19 at 10:09
  • Limit does not tell you that there were more. Eg calling Max on a very long stream, that might be Infinite, safer to throw exception than to return wrong number. – tkruse Jun 26 '19 at 12:16
  • I'm not too sure of that - it very much depends on your use-case, but I see your point. – orirab Jun 26 '19 at 12:22
  • If this is a fitting answer, would you consider accepting it? – orirab Jun 26 '19 at 21:02