1

The practical task is being able to return a Stream by reading from a file, but doing so without loading the whole file (or the parsed collection) entirely in memory. The purpose for that stream may be determined later -- e.g. save to DB. The developer would have a handle to a deserialized stream (instead of a deserialized collection).

The issue with that is that there is no guarantee that one line from the file equals one MyEntity object (in which case I could have used this article: http://blog.codeleak.pl/2014/05/parsing-file-with-stream-api-in-java-8.html)

In general, one may find a situation where, provided an input stream, one needs to return an output stream constructed by mapping a variable number of input stream items to one output stream item.

So, my solution so far is by using a Supplier, like this:

public class Parser{

    public Stream<MyEntity> parse(final InputStream stream) {
        return Stream.generate(new AggregatingSupplier(stream));
    }
    private class AggregatingSupplier implements Supplier<MyEntity> {

        private final Scanner r;

        public AggregatingSupplier(final InputStream source) {
            this.r= new Scanner(source);
        }

        @Override
        public MyEntity get() {
            MyEntity re=new MyEntity();
            while (r.hasNextLine() &&!re.isComplete()){
                String line=r.nextLine();
                // ... do some processing
            }
            return re;
        }
    }
}

The problem with this approach is that the stream obtained with Stream.generate is infinite. There is no stop condition. Throwing an exception works (somewhat). Or choosing a completely different (classical) approach.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
Deroude
  • 1,052
  • 9
  • 24

2 Answers2

5

Instead of a Supplier consider implementing your custom Spliterator. It's less daunting than it may look at first (by inspecting the Spliterator interface) because there is the Spliterators.AbstractSpliterator base class which makes it quite easy: just provide tryAdvance(), which will look basically the same as what you have now in Supplier.

The halting condition becomes simple: just let tryAdvance() return false.

Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436
  • and then use [StreamSupport.stream](https://docs.oracle.com/javase/8/docs/api/java/util/stream/StreamSupport.html#stream-java.util.Spliterator-boolean-) to return it. Thanks, I'll give it a try. I wonder if the stream will remain lazy though. I'll do some testing and post back. – Deroude Aug 21 '15 at 10:24
  • Sure it will---all streams are backed by a Spliterator and you'll get the exact same behavior with yours. You can also look into [my other post](http://stackoverflow.com/questions/22569040/readerlines-parallelizes-badly-due-to-nonconfigurable-batch-size-policy-in-it) if you're interested in parallelization. – Marko Topolnik Aug 21 '15 at 10:37
2

Using my StreamEx library this might be somewhat simpler as it has the killer feature to partially reduce the stream combining several adjacent elements based on the specified condition. For example, you can do the following:

public Stream<MyEntity> parse(final InputStream stream) throws IOException {
    return StreamEx.ofLines(new InputStreamReader(stream))
                   .groupRuns((a, b) -> !isEndOfEntry(a))
                   .map(strings -> createMyEntityFromListOfStrings(strings));
}

The groupRuns method accepts the BiPredicate which is applied to the pair of adjacent lines and should return true if these lines are part of the same group. If you have specific marker which marks the last line of the entry, you can test the first string (a) for this. Alternatively if it's easier to detect the start of the new entry, you can check the string b. This method creates StreamEx<List<String>> which elements are lists of grouped strings, so you can process them to create your MyEntity object. If you don't like to have an intermediate List, you can write the Collector which creates your MyEntity and use the collapse(BiPredicate, Collector) method which accepts the same BiPredicate and any Collector to perform the partial reduction.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334