14

Today I tried to refactor this code, that reads ids from files in a directory,

Set<Long> ids = new HashSet<>();
for (String fileName : fileSystem.list("my-directory")) {
    InputStream stream = fileSystem.openInputStream(fileName);
    BufferedReader br = new BufferedReader(new InputStreamReader(stream));
    String line;
    while ((line = br.readLine()) != null) {
        ids.add(Long.valueOf(line.trim()));
    }
    br.close();
}

using stream api

Set<Long> ids = fileSystem.list("my-directory").stream()
    .map(fileName -> fileSystem::openInputStream)
    .map(is -> new BufferedReader(new InputStreamReader(is)))
    .flatMap(BufferedReader::lines)
    .map(String::trim)
    .map(Long::valueOf)
    .collect(Collectors.toSet());

Then I found that IO streams will not be closed and I don't see a simple way to close them, because they are created inside the pipeline.

Any ideas?

upd: FileSystem in example is HDFS, Files#lines and similar methods can't be used.

AdamSkywalker
  • 11,408
  • 3
  • 38
  • 76
  • Also, don't forget that you need to close the Stream created by fileSystem.list("my-directory").stream() if you are accessing the actual file system. These are auto-closable so can be done with a try-with-resources block. – Skepi Apr 25 '17 at 11:22
  • @GeraldMücke I checked it, both solutions would not help here, try-with-resources has exact same problem as Luan Nico's answer – AdamSkywalker Apr 25 '17 at 11:51
  • @JarrodRoberson since you have time to close the question, maybe you'll elaborate how wrapper solution or try-with-resources from that answer will help here? – AdamSkywalker Apr 25 '17 at 20:19

3 Answers3

17

It is possible to hook into the stream to 'close' resources once all elements of the stream have been consumed. So it is possible to close the reader after all lines have been read with the following modification:

.flatMap(reader -> reader.lines().onClose(() -> close(reader)))

Where close(AutoClosable) handles the IOException.

As a proof of concept, the following code and output has been tested:

import java.util.stream.Stream;

class Test {
    public static void main(String[] args) {
        Stream.of(1, 2, 3).flatMap(i ->
                Stream.of(i, i * 2).onClose(() ->
                        System.out.println("Closed!")
                )
        ).forEach(System.out::println);
    }
}

1
2
Closed!
2
4
Closed!
3
6
Closed!
Kiskae
  • 24,655
  • 2
  • 77
  • 74
  • oh, maybe it's what I was looking for – AdamSkywalker Apr 25 '17 at 11:40
  • great, it works – AdamSkywalker Apr 25 '17 at 11:44
  • @Kiskae yes definitely plus one. this is exactly why `onClose` exists - to close the resources if you otherwise can't. – Eugene Apr 25 '17 at 12:15
  • @Kiskae a good example showing defer to close a `AutoCloseable` resource when using as a stream, I want to plus more but I can't. – holi-java Apr 25 '17 at 12:43
  • This is correct, but if I were performing a code review, I would suggest that a plain loop is far more readable. – VGR Apr 25 '17 at 14:42
  • 1
    @VGR It would probably be good to know about this feature if you have a component that returns a Stream object, so it cleans up after use. – Kiskae Apr 25 '17 at 14:44
  • 4
    Using the `UncheckedCloseable` of [this answer](https://stackoverflow.com/a/32232173/2711488), you may simply use `.flatMap(br -> br.lines().onClose(UncheckedCloseable.wrap(br)))`. – Holger Apr 25 '17 at 15:29
4

Why not a bit simpler, via Files.lines:

try (Stream<String> s = Files.lines(Paths.get("yourpath" + fileName))) {
    s.map(String::trim)
      .map(Long::valueOf)
      .collect(Collectors.toSet());
}
Eugene
  • 117,005
  • 15
  • 201
  • 306
0

I haven't tested the actual code, but maybe something along these lines?

Set<Long> ids = fileSystem.list("my-directory").stream()
.map(fileName -> fileSystem::openInputStream)
.flatMap(is -> {
    try (BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
      return is.lines().map(String::trim).map(Long::valueOf);
    }
 })
.collect(Collectors.toSet());

Of course not as pretty as yours, but I believe it's the closest that allows you to close it.

Luan Nico
  • 5,376
  • 2
  • 30
  • 60
  • 4
    will not the IO stream be closed inside flatMap before actually returning lines? need to test that – AdamSkywalker Apr 25 '17 at 11:21
  • I'm not sure, @AdamSkywalker, that's a very good point. Can you test it in your code? I'm not sure what is this fileSystem variable. – Luan Nico Apr 25 '17 at 11:31
  • 3
    yes, I tested in on local file system and got java.io.UncheckedIOException: java.io.IOException: Stream closed, seems that my first comment is correct – AdamSkywalker Apr 25 '17 at 11:34