6

I'm creating multiple streams which I have to access in parallel (or possibly-parallel). I know how to make a try-with-resources when the amount of resources is fixed at compile-time, but what if the amount of resources is determined by a parameter?

I have something like this:

private static void foo(String path, String... files) throws IOException {
    @SuppressWarnings("unchecked")
    Stream<String>[] streams = new Stream[files.length];

    try {
        for (int i = 0; i < files.length; i++) {
            final String file = files[i];
            streams[i] = Files.lines(Paths.get(path, file))
                .onClose(() -> System.out.println("Closed " + file));
        }

        // do something with streams
        Stream.of(streams)
            .parallel()
            .flatMap(x -> x)
            .distinct()
            .sorted()
            .limit(10)
            .forEach(System.out::println);
    }
    finally {
        for (Stream<String> s : streams) {
            if (s != null) {
                s.close();
            }
        }
    }
}
Stuart Marks
  • 127,867
  • 37
  • 205
  • 259
Mark Jeronimus
  • 9,278
  • 3
  • 37
  • 50
  • 2
    Are you asking if there's a try-with resources that would handle your situation? The answer is no, but what you have is just fine. – Kayaman May 07 '15 at 13:49
  • An alternative would be to move the opening of the streams into the parallel operations, with each one only having to deal with one stream. – biziclop May 07 '15 at 13:53
  • 2
    Yes, although there's a catch: the `close()` could in theory (though unlikely in practice) throw an `UncheckedIOException`, so you should probably wrap the `s.close()` in a `try { s.close(); } catch (Exception ex) { //quash or log }`. – Andrew Janke May 07 '15 at 13:56
  • 2
    Beware! You combination of `sorted()`/`distinct()` and `forEach` (rather than `forEachOrdered`) is a recipe for problems. See http://stackoverflow.com/q/28259636/2711488 – Holger May 07 '15 at 16:35

2 Answers2

6

You could write a composite AutoCloseable for managing a dynamic amount of AutoCloseable:

import java.util.ArrayList;
import java.util.List;

public class CompositeAutoclosable<T extends AutoCloseable> implements AutoCloseable {
    private final List<T> components= new ArrayList<>();

    public void addComponent(T component) { components.add(component); }

    public List<T> getComponents() { return components; }

    @Override
    public void close() throws Exception {
        Exception e = null;
        for (T component : components) {
            try { component.close(); }
            catch (Exception closeException) {
                if (e == null) { e = closeException; }
                else { e.addSuppressed(closeException); }
            }
        }
        if (e != null) { throw e; }
    }
}

and you could use it in your method:

private static void foo(String path, String... files) throws Exception {
    try (CompositeAutoclosable<Stream<String>> streams 
            = new CompositeAutoclosable<Stream<String>>()) {
        for (int i = 0; i < files.length; i++) {
            final String file = files[i];
            streams.addComponent(Files.lines(Paths.get(path, file))
                .onClose(() -> System.out.println("Closed " + file)));
        }
        streams.getComponents().stream()
            .parallel()
            .flatMap(x -> x)
            .distinct()
            .sorted()
            .limit(10)
            .forEach(System.out::println);
    }
}
gontard
  • 28,720
  • 11
  • 94
  • 117
  • Although I dislike creating custom utility classes, it at least works. The reason I dislike it? When other developers working on (different parts of) the project have a similar need, they might not know about the utility class and design their own solution. – Mark Jeronimus May 07 '15 at 15:20
  • This could be solve by communication between developers. "Yesterday i had this problem so i wrote an utility..." – gontard May 07 '15 at 15:27
  • 2
    If you catch a `closeException` and there’s already a previous exception you should use [`addSuppressed`](http://docs.oracle.com/javase/8/docs/api/java/lang/Throwable.html#addSuppressed-java.lang.Throwable-) instead of overwriting the previous exception… – Holger May 07 '15 at 16:18
  • Isn't it bad to just catch / throw `Exception` ? Could you not use `IOExcetion` here instead ? – Jonas Czech May 07 '15 at 17:35
  • No, unfortunately AutoCloseable declares `close() throws Exception`. @Holger good to know. – Mark Jeronimus May 07 '15 at 19:18
  • 1
    @JonasCz: unfortunately, `Stream`s implement `AutoCloseable` rather than `Closeable`. While `Stream.close` does not declare checked exceptions, the `interface` does. So trying to write the code without declaring `throws Exception` would require to special-case the `Stream` instead of treating it generically via the `interface`. – Holger May 07 '15 at 19:23
  • @Holger i have updated the code to manage several exceptions. – gontard May 11 '15 at 07:53
1

The documentation of Stream.flatMap says:

Each mapped stream is closed after its contents have been placed into this stream.

In other words, for ordinary closing of the streams, there is no additional action necessary. However, since only processed streams are closed, you shouldn’t create the streams eagerly without knowing whether they are later processed by the stream:

private static void foo(String path, String... files) throws IOException {
    Arrays.stream(files).flatMap(file-> {
              try { return Files.lines(Paths.get(path, file))
                    .onClose(() -> System.out.println("Closed " + file)); }
              catch(IOException ex) { throw new UncheckedIOException(ex); } })
          .parallel()
          .distinct()
          .sorted()
          .limit(10)
          .forEachOrdered(System.out::println);
}

By creating the sub-streams within flatMap, it’s guaranteed that each is only created if the stream is going to process it. Thus, this solution will close all sub-streams even without having the outer Stream inside a try-with-resource statement.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • While I'm sure your solution works, it's very hard to read and see what's going on in each statement. Every developer should prefer easy-to-read code over optimized, and/or clever code, or it will cause maintenance problems or bugs. – Mark Jeronimus May 07 '15 at 19:12
  • Are you sure your first solution reads the files in parallel like the original code in the question? I think `parallel` should be applied before `flatMap`. – Przemyslaw Zych May 09 '15 at 15:34
  • @Przemyslaw Zych: the placement of `parallel()` is irrelevant, it will turn the entire processing to parallel as a stream pipeline as a whole is either parallel or sequential. – Holger May 11 '15 at 08:42