2

To get familliar with the stream api, I tried to code a quite simple pattern.

Problem: Having a text file containing not nested blocks of text. All blocks are identified by start/endpatterns (e.g. <start> and <stop>. The content of a block isn't syntactically distinguishable from the noise between the blocks. Therefore it is impossible, to work with simple (stateless) lambdas.

I was just able to implement something ugly like:
Files.lines(path).collect(new MySequentialParseAndProsessEachLineCollector<>());
To be honest, this is not what I want.

Im looking for a mapper something like:
Files.lines(path).map(MyMapAllLinesOfBlockToBuckets()).parallelStream().collect(new MyProcessOneBucketCollector<>());

is there a good way to extract chunks of data from a java 8 stream seems to contain a skeleton of a solution. Unfortunatly, I'm to stubid to translate that to my problem. ;-)

Any hints?

Community
  • 1
  • 1
Uwe
  • 51
  • 4

1 Answers1

2

Here is a solution which can be used for converting a Stream<String>, each element representing a line, to a Stream<List<String>>, each element representing a chunk found using a specified delimiter:

public class ChunkSpliterator implements Spliterator<List<String>> {
    private final Spliterator<String> source;
    private final Predicate<String> start, end;
    private final Consumer<String> getChunk;
    private List<String> current;

    ChunkSpliterator(Spliterator<String> lineSpliterator,
        Predicate<String> chunkStart, Predicate<String> chunkEnd) {
        source=lineSpliterator;
        start=chunkStart;
        end=chunkEnd;
        getChunk=s -> {
            if(current!=null) current.add(s);
            else if(start.test(s)) current=new ArrayList<>();
        };
    }
    public boolean tryAdvance(Consumer<? super List<String>> action) {
        while(current==null || current.isEmpty()
                            || !end.test(current.get(current.size()-1)))
            if(!source.tryAdvance(getChunk)) return false;
        current.remove(current.size()-1);
        action.accept(current);
        current=null;
        return true;
    }
    public Spliterator<List<String>> trySplit() {
        return null;
    }
    public long estimateSize() {
        return Long.MAX_VALUE;
    }
    public int characteristics() {
        return ORDERED|NONNULL;
    }

    public static Stream<List<String>> toChunks(Stream<String> lines,
        Predicate<String> chunkStart, Predicate<String> chunkEnd,
        boolean parallel) {

        return StreamSupport.stream(
            new ChunkSpliterator(lines.spliterator(), chunkStart, chunkEnd),
            parallel);
    }
}

The lines matching the predicates are not included in the chunk; it would be easy to change this behavior, if desired.

It can be used like this:

ChunkSpliterator.toChunks( Files.lines(Paths.get(myFile)),
    Pattern.compile("^<start>$").asPredicate(),
    Pattern.compile("^<stop>$").asPredicate(),
    true )
   .collect(new MyProcessOneBucketCollector<>())

The patterns are specifying as ^word$ to require the entire line to consist of the word only; without these anchors, lines containing the pattern can start and end a chunk. The nature of the source stream does not allow parallelism when creating the chunks, so when chaining with an immediate collection operation the parallelism for the entire operation is rather limited. It depends on the MyProcessOneBucketCollector if there can be any parallelism at all.

If your final result does not depend on the order of occurrences of the buckets in the source file, it is strongly recommended that either your collector reports itself to be UNORDERED or you insert an unordered() in the stream’s method chains before the collect.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • For a better understanding a question on parallelism. Assuming the buckets are `UNORDERED`, but the order of the lines in the List matters, how to force a paralel execution of the collector?
    ´ChunkSpliterator.toChunks(...).unordered().parallelStream().collect(..)` ???
    – Uwe Oct 20 '14 at 15:00
  • 1
    The order of lines within a bucket will always be retained as the bucket `List`s are always created single-threaded (as I said, the nature of the source stream does not allow parallelism) and `List`s will not suddenly change their element order afterwards. So calling `.unordered()` on the result of `toChunks` may only retract the order of the buckets. Note that you don’t need to call `.parallelStream()`, passing `true` to the method `.toChunks(…)` will create a parallel stream already. – Holger Oct 20 '14 at 15:13
  • Vielen Dank mein Meister. :-) – Uwe Oct 21 '14 at 07:46