0

Considering a Java Stream object generated by method:

// get Stream object
Stream<String> dataCollection = myFunctionToGenerateStreamObj(...)

// get count for logging purpose
int total = dataCollection.count()

// iterate each object and do something
dataCollection.forEach( obj -> {
    anotherFunctionToProcessObj(obj);
});

Obviously above code will throw

IllegalStateException: stream has already been operated upon or closed

on the line for iteration. I understand there a solution to use Supplier class. I just wondering what is the best practice to code above logic in most common and efficient way.

Dreamer
  • 7,333
  • 24
  • 99
  • 179
  • Streams in java are immutable and each act on Stream generates a new Stream object, hence we cannot reuse the stream. Streams are traversable only once. If you traversed stream once, it is said to be consumed. To traverse again we need to get a new stream. – Manjunath H M Sep 28 '22 at 06:08
  • At the line dataCollection.count(), stream is already used so you cannot reuse the same stream i.e dataCollection. – Manjunath H M Sep 28 '22 at 06:09

4 Answers4

0

The point is that the act of creating and iterating is potentially expensive.

Consider this simple one: Files.lines. It opens the file and reads it, line by line, exposing that as a Stream<String>.

You can, of course, wrap this in a supplier: () -> Files.lines(somePath) and pass that around. Anybody you gave that supplier to can invoke it, which results in the OS opening a file handle, and asking the disk to go spit out some bits.

But, the point is, the cost here is the stream process. The actual thing you do with the stream? Dwarfed, utterly, by the disk access (though with fast SSDs this matters less. If it helps, replace that with a file loaded over a network connection. That is metered).

Some streams aren't anything like that. if you have an ArrayList and you call .stream() on that, the 'processing' costs of the stream itself (making the stream object, and streaming through the list's elements), is an ignorable cost, it's just forwarding a memory pointer, through contiguous memory no less. It doesn't even cache miss much.

But Stream is an abstraction - the point is that it sucks to have to care. It's bad to have to document "if you pass a supplier that supplies a stream with high setup and processing costs, this code is quite slow and if its loaded across a metered network it'll really rack up your bill".

So, don't. Which means, the best thing to do is to stream only once.

Unfortunately there is no simple way to just make 2 Stream objects that are powered by a single stream through.

One way to do this count thing would be:

AtomicInteger counter = new AtomicInteger();
Stream<String> dataCollection = ....;
dataCollection.peek(x -> counter.incrementAndGet()).forEach(...);

It's annoying to have to do it like this. A key problem here is that lambdas don't like changing state, and methods can only return one thing.

rzwitserloot
  • 85,357
  • 5
  • 51
  • 72
  • 2
    [`Stream.peek()` is a *really* bad choice here](https://stackoverflow.com/q/33635717/1079354), since it's not the way the API is meant to be used. – Makoto Sep 28 '22 at 06:03
  • @Makoto isn't it the only sane way to count the number of objects processed in a stream? – Keshavram Kuduwa Sep 28 '22 at 06:42
  • @KeshavramKuduwa: No? I offer such an approach. If the stream is infinite then the stream won't be able to tell you until you stop consuming it, so the consumer is better positioned to tell you what it's consumed and not the firehose it's consuming from. – Makoto Sep 28 '22 at 06:52
  • @Makoto you can't just fly by, say 'this is bad', and don't elaborate. It sounds like you have no idea how it works, maybe? `peek` wouldn't consume the firehose, and all ends of a stream are the same. peek just counts items as they travel through the stream. It's the `forEach` that would drain the firehose (and as the forEach consumes, `counter` dutifully ticks up). – rzwitserloot Sep 28 '22 at 12:40
  • 1
    @rzwitserloot: I provide a link to a previous conversation in which someone wanted to leverage `peek` to modify state. But since you asked, I'll provide a more direct link [to the API](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#peek-java.util.function.Consumer-), in which it is *very clearly stated* that this API only exists for debugging purposes. Using the API in an unintended way is going to have unintended consequences, especially if the API changes. – Makoto Sep 28 '22 at 14:51
  • @Makoto I don't know why you are on a tirade and you feel like lying is fine. It _very clearly states_ that it is _only_ for debugging? Click your own links. It says 'mainly'. The gap between 'only' and 'mainly' is rather large, don't you think? – rzwitserloot Sep 28 '22 at 16:03
  • @rzwitserloot: No? If I publish something for someone to use as a debug utility, and then it's morphed into something that was unintended, then I introduce a behavior change to that which breaks the unintended features, then that becomes a major issue. Again, in the first link I put in the comment thread, using the API for unintended purposes is a ***bad idea***. `peek` is not intended to be used as a way to perform these kinds of intermediate operations, and promoting it as one such way is actively harmful. – Makoto Sep 28 '22 at 16:58
  • What are you on about? The `peek` docs explain precisely what peek does. It furthermore states that it is _mainly_ intended for debugging. You're seeing things. – rzwitserloot Sep 28 '22 at 21:00
0

If it is not an unbounded stream, what about map?:

List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6);
long count = integers.stream().map(x -> {
    System.out.println("x = " + x);
    return 1;
}).count();
System.out.println("count = " + count);
Hi computer
  • 946
  • 4
  • 8
  • 19
-1

I think what you have can be represented as special case of reduction. More specifically, Stream.reduce(identity, accumulator, combiner).

int totalCount = dataCollection
        .reduce(0, (currentCount, obj) -> {
          anotherFunctionToProcessObj(obj);
          return currentCount + 1;
        }, Integer::sum);
System.out.println(totalCount);
  1. Identity is the initial value for the counter - 0
  2. Accumulator function will process object and return current count increased by 1
  3. Combiner will sum the result of 2 accumulators

As stated in the stream-docs, the preferred way is reduction:

Many computations where one might be tempted to use side effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators.

As pointed out in comments, if the stream is infinite, you won't be able to reduce it(it's not possible to get the total count anyway). In this case external counter to keep track of the number of processed elements can be used:

AtomicInteger counter = new AtomicInteger(0);
dataCollection.forEach(obj -> {
    anotherFunctionToProcessObj(obj);
    counter.incrementAndGet();
});

Keep in mind, this is ok for this particular corner case only, as changing external state via side effects is bad practice.

As another option, you can just get the stream's iterator and use simple while loop.

Iterator<String> iterator = dataCollection.iterator();
int count = 0;
while (iterator.hasNext()) {
  count++;
  anotherFunctionToProcessObj(iterator.next());
}
System.out.println(count);
Chaosfire
  • 4,818
  • 4
  • 8
  • 23
  • How can you reduce an infinite stream, supposing that the stream was infinite (and there's nothing specifying that the stream *couldn't* be infinite)? Wouldn't getting the iterator just put a lot of data into memory? – Makoto Sep 28 '22 at 06:25
  • @Makoto Nothing in the question indicates the stream being infinite, quite on the contrary, OP is already getting the count once, before attempting iteration. Because of this we can safely assume the stream is finite. – Chaosfire Sep 28 '22 at 07:00
  • @Makoto About the iterator approach, there isn't a single `Iterator` implementation, which loads all objects in memory, it beats the entire purpose of an iterator. There is no indication `Stream.iterator()` would do it either. Actually, if you check the implementation, this iterator advances using the stream's spliterator, so it does not load lots of data in memory. – Chaosfire Sep 28 '22 at 07:07
  • I see where you're coming from, but there's the significant caveat that someone is going to come in with an unbound stream and try to use this solution and be surprised why they're getting a different answer. – Makoto Sep 28 '22 at 15:06
  • @Makoto I agree that someone might come here with infinite stream situation, and in those cases it can be problematic, but an answer needs to have the context of the question in mind. It's rare that we can have a generic fit all purposes solution and this situation certainly is not one of them. In the context of the current question reducing the stream is fine. But i still added a warning, just in case. – Chaosfire Sep 29 '22 at 08:11
-3

The stream you're pulling data from should not care about how large it is. After all, streams can be unbounded in size. This is also important in the context of what it is you're streaming; if you're streaming whole objects then it's a lot different than if you were streaming individual characters from a data source that was whole terabytes in size.

So...the only place you could realistically keep track of what you've seen is where the data is processed. In anotherFunctionToProcess, since you're just...consuming the data (doesn't look like you have to worry about threads too much), you could keep a tally inside of that class.

Because you're leveraging forEach, this already creates a side effect, and while you shouldn't look to create side effects when operating in streams, there's not a better way to do this without simply disregarding the requirement to count the elements.

This way, you only ever stream the data once and you keep track of what you process in the process.

Makoto
  • 104,088
  • 27
  • 192
  • 230
  • Counting processed elements in `anotherFunctionToProcess` would create either stateful function, or function with side effects, wouldn't it? While that is ok for sequential streams, in the case of a parallel stream that would require synchronization, which mostly kills the benefits of parallelism. – Chaosfire Sep 28 '22 at 07:42
  • `peek()` is specifically designed to let you peek at what is being consumed, hence the name. – rzwitserloot Sep 28 '22 at 12:41
  • @Chaosfire: I don't think it would kill the benefit of parallelization, but it would be the moment to say, "Hey, we're counting a stream that has more elements than a long can hold, why are we counting this again?" To your point about the function with side effects, *yes* that is very true, but `forEach` [only exists to explicitly create side-effects](http://www.javapractices.com/topic/TopicAction.do?Id=278), so...it's a bit of a moot point. – Makoto Sep 28 '22 at 14:56
  • @rzwitserloot: Yes, but you're not meant to be consuming anything or taking any action besides a debug operation. You could log something out with `peek` but the OP isn't asking to log anything out, they're looking to introduce a side effect by counting while running the stream. That is *bad*. – Makoto Sep 28 '22 at 14:56
  • @Makoto it depends on what you care about. If you want to measure how many stream elements are being introspected, `peek` is what you want. If you want to measure how large the stream is regardless of anything else, that's more complicated, and we need to look a the terminating operation. – rzwitserloot Sep 28 '22 at 16:02
  • @Makoto I said it kills the parallelism benefit mostly, not entirely, that's the downside. And how often do we have a stream, with more elements than max value long? While the possibility certainly exists, it's an extreme corner case. Even then, as long as the stream is finite, we can reduce to BigInteger, if needed. Even if we ignore side effects and statefulness, counting in the consumer will give us the current count, not the total count as requested. And it's not like we can get the total count of elements in infinite stream, unless we use short-circuiting operation. – Chaosfire Sep 29 '22 at 08:17
  • You raise a valid point about infinite stream, but unless the question specifies having such corner case scenario, this is outside of scope. Counting in the consumer is suboptimal, unless we enter this specific corner case. There are better solutions for the normal scenario(finite stream). – Chaosfire Sep 29 '22 at 08:18
  • @Chaosfire: Given that people copy code from Stack Overflow all the time, *and* given that people automatically assume that a collection is congruent to a stream, I figured it'd be better to solve the problem in the stream approach as opposed to the "I want to iterate a stream like I do with a collection"-approach. I disagree that there are better solutions for this, since if one thinks of this from a streaming perspective, then...how many things you've processed is ultimately up to the consumer to count, but even if it were unbounded, that would be...impossible to do. – Makoto Sep 29 '22 at 15:31
  • @Makoto Counting in the consumer is **bad** use of side-effects, because it changes **external** state. The stream docs you linked also state so and that reduction is the preffered way - `Many computations where one might be tempted to use side effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators.` The example with the arraylist, which does exactly changing external state(that of the arraylist), is given as an example of how **not** to use side effects. – Chaosfire Sep 29 '22 at 17:31
  • Exactly because people copy code from Stack Overflow all the time, (it's their responsibility to understand what and why the code does, but that's different topic), we should not advocate a practice, discouraged by the docs, unless the situation requires us to look for workarounds - it does not. – Chaosfire Sep 29 '22 at 17:32