-4

Imagine we have a Collector, and we want to feed it the contents of a succession of Streams.

The most natural way of doing it would be concatenating the Streams and feeding the Collector with the concatenation. But this might not be optimal: for example, if each Stream reads from a scarce resource allocated with a try-with-resources, it would be expensive to have all Streams at once.

Also, sometimes we might not even have direct access to the Streams, we might only have an opaque method that "feeds" a Collector that it receives as parameter, and returns the result.

How to feed a Collector from multiple sources in those cases?

danidiaz
  • 26,936
  • 4
  • 45
  • 95
  • 1
    Just use `flatMap`; the streams will not exist at the same time and the documentation even specifies that `close` will be invoked after all elements have been processed. – Holger May 11 '21 at 10:51
  • 1
    See [this answer](https://stackoverflow.com/a/54829307/2711488) for a discussion of `flatMap` vs try-with-resources. – Holger May 11 '21 at 12:29
  • @Holger That answer says that putting try-with-resources inside the `flatMap` callback is problematic. And pulling all the try-with-resources "outside" would be wasteful. So my question still stands. – danidiaz May 11 '21 at 12:47
  • For a concrete situation, imagine that we want to consume with a `Collector` the lines of every file inside a 10000-file folder, using `File.lines()` https://stackoverflow.com/questions/34072035/why-is-files-lines-and-similar-streams-not-automatically-closed without having at any point more than a single file descriptor open, and also ensuring proper cleanup in the face of any I/O exception that may crop up while reading. – danidiaz May 11 '21 at 12:54
  • 1
    You objections simply don’t make any sense. You should read answers to the end, not just the first few lines. Again, **flatMap closes the stream**, try-with-resources is just about closing the resource which is not necessary with flatMap, because **flatMap closes the stream**. It’s not even clear what “pulling all the try-with-resources outside” is supposed to mean. You can not use try-with-resources with flatMap. Point. And you don’t need to use try-with-resources with flatMap, because **flatMap closes the stream**. Point. – Holger May 11 '21 at 13:12
  • @Holger It does seem like `flatMap` closes sub-streams even in the event of an exception. I feel that the official documentation is a bit ambiguous in that respect. https://docs.oracle.com/en/java/javase/15/docs/api/java.base/java/util/stream/Stream.html#flatMap(java.util.function.Function) It says "Each mapped stream is closed after its contents have been placed into this stream." but it wasn't clear—at least to me—if that also covered exceptional termination, in addition of normal sub-stream exhaustion. – danidiaz May 11 '21 at 15:51

2 Answers2

0

One solution is to use this auxiliary duplicate function:

static <T,A,R> Collector<T,A,Collector<T,A,R>> duplicate(Collector<T,A,R> collector) {
    final Supplier<A> supplier = collector.supplier();
    final BiConsumer<A, T> accumulator = collector.accumulator();
    final BinaryOperator<A> combiner = collector.combiner();
    final Function<A, R> finisher = collector.finisher();
    final Function<A, Collector<T,A,R>> newFinisher = (finalState) ->
            Collector.of(() -> finalState, accumulator, combiner, finisher);
    return Collector.of(supplier,accumulator,combiner,newFinisher);
}

duplicate takes a Collector and returns a Collector just like the first one, except that instead of the original result type, it returns another Collector as result, one that we can later pass to further Streams:

public static void main( String[] args )
{
    final Collector<Integer, ?, Integer> summy0 =
         Collectors.summingInt(i -> i);

    final Collector<Integer, ?, Integer> summy1 =
         Stream.<Integer>of(1, 2, 3).collect(duplicate(summy0));

    final Collector<Integer, ?, Integer> summy2 =
         Stream.<Integer>of(4, 5, 6).collect(duplicate(summy1));

    System.out.println(Stream.<Integer>of().collect(summy2));
}
danidiaz
  • 26,936
  • 4
  • 45
  • 95
  • 2
    The contract of a collector’s supplier is to return a new empty container on each evaluation. Always returning the same instance, not to speak of a non-empty result of a previous operation, violates the contract. So it doesn’t come as surprise that adding `.parallel()` to the second stream will produces invalid results. – Holger May 11 '21 at 13:34
  • @Holger You're right, it appears that my solution breaks the contract. – danidiaz May 11 '21 at 15:44
0

Instead of concatenating Streams that have been previously allocated with a try-with-resources, a possible solution is to splice them in a "top-level" Stream using flatMap, and then consume the resulting Stream with the Collector.

The usual recommendation for safe resource handling with Streams is to use try-with-resources. However, flatMap behaves specially in that respect: it itself ensures that the spliced sub-Streams are closed, both when they are "exhausted" in the main Stream, and when the Stream is interrupted because of an exception.

To my mind, the wording in the flatMap javadocs feels a bit ambiguous about cleanup in the face of exceptions:

Each mapped stream is closed after its contents have been placed into this stream.

But this experiments show that sub-Streams are closed even when an exception crops up:

// This prints "closed!" before the stack trace
Stream.of(1,2,3)
.flatMap((i) ->
     Stream.<String>generate(() -> { throw new RuntimeException(); })
     .limit(2)
     .onClose(() -> System.err.println("closed!"))
).forEach(System.err::println);
danidiaz
  • 26,936
  • 4
  • 45
  • 95
  • "Closing Java Streams with AutoCloseable" https://mikemybytes.com/2021/01/26/closing-java-streams-with-autocloseable/ – danidiaz May 12 '21 at 10:27