5

I'm processing a potentially infinite stream of data elements that follow the pattern:

E1 <start mark> E2 foo E3 bah ... En-1 bar En <end mark>

That is, a stream of <String>s, which must be accumulated in a buffer before I can map them to object model.

Goal: aggregate a Stream<String> into a Stream<ObjectDefinedByStrings> without the overhead of collecting on an infinite stream.

In english, the code would be something like "Once you see a start marker, start buffering. Buffer until you see an end marker, then get ready to return the old buffer, and prepare a fresh buffer. Return the old buffer."

My current implementation has the form:

Data<String>.stream()
            .map(functionReturningAnOptionalPresentOnlyIfObjectIsComplete)
            .filter(Optional::isPresent)

I have several questions:

  1. What is this operation properly called? (i.e. what can I Google for more examples? Every discussion I find of .map() talks about 1:1 mapping. Every discussion of .reduce) talks about n:1 reduction. Every discussion of .collect() talks about accumulating as a terminal operation...)

  2. This seems bad in many different ways. Is there a better way of implementing this? (A candidate of the form .collectUntilConditionThenApplyFinisher(Collector,Condition,Finisher)...?)

Thanks!

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
limbo
  • 51
  • 3
  • 1
    This seems like a _very bad idea._ `map` operations should be side-effect-free. What you almost certainly should be doing is calling `Stream.iterator()` and doing this in an "old school" fashion by moving the iterator until you hit the ``. – Louis Wasserman Dec 03 '14 at 23:38
  • More or less, streams aren't intended to be used this way. Iterators are a more reasonable way to go. – Louis Wasserman Dec 03 '14 at 23:44
  • *facepalm* This is exactly right. #FluCoding – limbo Dec 04 '14 at 03:22

3 Answers3

3

Unfortunately there's no partial reduce operation in Java 8 Stream API. However such operation is implemented in my StreamEx library which enhances standard Java 8 Streams. So your task can be solved like this:

Stream<ObjectDefinedByStrings> result = 
    StreamEx.of(strings)
            .groupRuns((a, b) -> !b.contains("<start mark>"))
            .map(stringList -> constructObjectDefinedByStrings());

The strings is normal Java-8 stream or other source like array, Collection, Spliterator, etc. Works fine with infinite or parallel streams. The groupRuns method takes a BiPredicate which is applied to two adjacent stream elements and returns true if these elements must be grouped. Here we say that elements should be grouped unless the second one contains "<start mark>" (which is the start of the new element). After that you will get the stream of List<String> elements.

If collecting to the intermediate lists is not appropriate for you, you can use the collapse(BiPredicate, Collector) method and specify the custom Collector to perform the partial reduction. For example, you may want to join all the strings together:

Stream<ObjectDefinedByStrings> result = 
    StreamEx.of(strings)
            .collapse((a, b) -> !b.contains("<start mark>"), Collectors.joining())
            .map(joinedString -> constructObjectDefinedByStrings());
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
2

To avoid your kludge you could filter before mapping.

Data<String>.stream()
    .filter(text -> canBeConvertedToObject(text))
    .map(text -> convertToObject(text))

That works perfectly well on an infinite stream and only constructs objects that need to be constructed. It also avoids the overhead of creating unnecessary Optional objects.

sprinter
  • 27,148
  • 6
  • 47
  • 78
0

I propose 2 more use cases for this partial reduction:

1. Parsing SQL and PL/SQL (Oracle procedural) statements

Standard delimiter for SQL statements is semicolon (;). It separates normal SQL statements from each other. But if you have PL/SQL statement then semicolon separates operators inside statement from each other, not only statements as whole.

One of the ways of parsing script file containing both normal SQL and PL/SQL statements is to first split them by semicolon and then if particular statement starts with specific keywords (DECLARE, BEGIN, etc.) join this statement with next statements following rules of PL/SQL grammar.

By the way, this cannot be done by using StreamEx partial reduce operations since they only test two adjacent elements. Since you need to know about previous stream elements starting from initial PL/SQL keyword element to determine whether or not to include current element into partial reduction or partial reduction should be finished. In this case mutable partial reduction may be usable with collector holding information of already collected elements and some Predicate testing either only collector itself (if partial reduction should be finished) or BiPredicate testing both collector and current stream element.

In theory, we're speaking about implementing LR(0) or LR(1) parser (see https://en.wikipedia.org/wiki/LR_parser) using Stream pipeline ideology. LR-parser can be used to parse syntax of most programming languages.

Parser is a finite automata with stack. In case of LR(0) automata its transition depends on stack only. In case of LR(1) automata it depends both on stack and next element from the stream (theoretically there can be LR(2), LR(3), etc. automatas peeking 2, 3, etc. next elements to determine transition but in practice all programming languages are syntactically LR(1) languages).

To implement parser there should be a Collector containing stack of finite automata and predicate testing whether final state of this automata is reached (so we can stop reduction). In case of LR(0) it should be Predicate testing Collector itself. And in case of LR(1) it should be BiPredicate testing both Collector and next element from stream (since transition depends on both stack and next symbol).

So to implement LR(0) parser we would need something like following (T is stream elements type, A is accumulator holding both finite automata stack and result, R is result of each parser work forming output stream):

<R,A> Stream<R> Stream<T>.parse(
    Collector<T,A,R> automataCollector,
    Predicate<A> isFinalState)

(i removed complexity like ? super T instead of T for compactness - result API should contain these)

To implement LR(1) parser we would need something like following:

<R,A> Stream<R> Stream<T>.parse(
    BiPredicate<A, T> isFinalState
    Collector<T,A,R> automataCollector)

NOTE: In this case BiPredicate should test element before it would be consumed by accumulator. Remember LR(1) parser is peeking next element to determine transition. So there can be a potential exception if empty accumulator rejects to accept next element (BiPredicate returns true, signalizing that partial reduction is over, on empty accumulator just created by Supplier and next stream element).

2. Conditional batching based on stream element type

When we're executing SQL statemens we want to merge adjacent data-modification (DML) statements into a single batch (see JDBC API) to improve overall performance. But we don't want to batch queries. So we need conditional batching (instead of unconditional batching like in Java 8 Stream with batch processing).

For this specific case StreamEx partial reduce operations can be used since if both adjacent elements tested by BiPredicate are DML statements they should be included into batch. So we don't need to know previous history of batch collection.

But we can increase complexity of the task and say that batches should be limited by size. Say, no more than 100 DML statements in a batch. In this case we cannot ignore previous batch collection history and using of BiPredicate to determine whether batch collection should be continued or stopped is insufficient.

Though we can add flatMap after StreamEx partial reduction to split long batches into parts. But this would delay specific 100-element batch execution until all DML statements would be collected into unlimited batch. Needless to say that this is against pipeline ideology: we want to minimize buffering to maximize speed between input and output. Moreover, unlimited batch collection may result in OutOfMemoryError in case of very long list of DML statements without any queries in between (say, million of INSERTs as a result of database export) which is intolerable.

So in case of this complex conditional batch collection with upper limit we also need something as powerful as LR(0) parser described in previous use case.

Community
  • 1
  • 1
UNV
  • 1
  • 2