4

I've stream of strings and nulls like

Stream<String> str1 = Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null);

I want to reduce it to another stream, where any sequence of not null string joined together, ie like

Stream<String> str2 = Stream.of("ABC", "", "D", "EF","G")

First way, that i found - create collector that firstly reduce complete input stream to single object with list of all joined strings and then create new stream from it:

class Acc1 {
  final private List<String> data = new ArrayList<>();
  final private StringBuilder sb = new StringBuilder();

  private void accept(final String s) {
    if (s != null) 
      sb.append(s);
    else {
      data.add(sb.toString());
      sb.setLength(0);
    }
  }

  public static Collector<String,Acc1,Stream<String>> collector() {
    return Collector.of(Acc1::new, Acc1::accept, (a,b)-> a, acc -> acc.data.stream());
  }
}
...
Stream<String> str2 = str.collect(Acc1.collector());

But in this case before any use if str2, even as str2.findFirst(), input stream will be completely processed. It time and memory consuming operation and on infinity stream from some generator it will not work at all

Another way - create external object that will keep intermediate state and use it in flatMap():

class Acc2 {
  final private StringBuilder sb = new StringBuilder();

  Stream<String> accept(final String s) {
    if (s != null) {
      sb.append(s);
      return Stream.empty();
    } else {
      final String result = sb.toString();
      sb.setLength(0);
      return Stream.of(result);
    }
  }
}
...
Acc2 acc = new Acc2();
Stream<String> str2 = str1.flatMap(acc::accept);

In this case from str1 will be retrieved only elemets that really accessed via str2.

But using of external object, created outside of stream processing, looks ugly for me and probably can cause some side effects, that i do not see now. Also if str2 will be used later with parallelStream() it will cause unpredictable result.

Is there any more correct implemetation of stream->stream reduction without these flaws?

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
rustot
  • 331
  • 1
  • 11
  • 1
    There is library"StreamEx" it contains grouptun method (do not remember exact name). Using it you can convert your stream to stream of filled and empty lists. Next move is obvious. – talex Oct 31 '16 at 11:21
  • Do you start of with a `Stream` or with some `Collection`? That would leave the possibility of using the spliterator – Jorn Vernee Oct 31 '16 at 11:39
  • This may interest you: http://stackoverflow.com/questions/29095967/splitting-list-into-sublists-along-elements/29096777 – Alexis C. Oct 31 '16 at 11:47
  • You could do a reduction (https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#reduce-U-java.util.function.BiFunction-java.util.function.BinaryOperator-) with Identity function being the empty List; the accumulator a function that string-concatenates the current element if not null to the last element of the accumulator if this is not an empty String, otherwise start a new String with the current element as the value, if current element is null, append "" and so on; the combiner being the list concatenation (addAll) – Josep Prat Oct 31 '16 at 12:08
  • 1
    @Josep Prat: you should *never* use `reduce` with functions that modify their arguments. The `collect` method, as already used by the OP, is the right approach for mutable reduction. The fact, that reduction will process all elements, doesn’t change, whether you use `reduce` or `collect`. – Holger Oct 31 '16 at 12:19
  • @Holger I never said you should modify the arguments, you take the acc from the accumulator function and your return a new accumulator with this new value changed, it doesn't need to mutate or modify their arguments. This is a common pattern in Functional programming, I don't see the problem you are stating – Josep Prat Oct 31 '16 at 12:48
  • 1
    @Josep Prat: you started with an empty `List` as “Identity function” and omitted how this list plays together with the strings in the accumulator function, but obviously had modifying the `List` in mind, further you wrote “*the combiner being the list concatenation (addAll)*” and what is `List.addAll`, if not a function that modifies one of the input lists? – Holger Oct 31 '16 at 12:53
  • @Holger It's a comment and not an answer and there is a character limitation, I wasn't clear enough. Concatenation should return a new instance of the contents of both lists, so constructor with collection + addAll. And I do agree with you that you should not mutate the accumulator on reduce. – Josep Prat Oct 31 '16 at 12:58
  • 1
    Ok, your comment is not supposed to be a complete solution, but why do you suggests using `reduce` at all; what’s the supposed advantage over the `Collector` based solution, the OP already has posted in the question? – Holger Oct 31 '16 at 13:02
  • I guess I'm biased towards using reduce because many other languages don't differentiate between reduce and collect (only reduce is available) and aim to work with immutable structures (for example Haskell, Scala...) – Josep Prat Oct 31 '16 at 13:30
  • 1
    @Josep Prat: Sure, when you have immutable data structures only, there is no need to differentiate. That also implies that the language’s library will provide the necessary tools to deal with these structures (like a list concatenation that returns a new list, but may somehow get optimized under the hood at runtime to reduce or even eliminate the copying overhead). But here, the question wasn’t how to do the reduction… – Holger Oct 31 '16 at 13:44
  • @Holger I agree – Josep Prat Oct 31 '16 at 13:46

2 Answers2

5

Reduction or its mutable variant, collect, is always an operation that will process all items. Your operation can be implemented via a custom Spliterator, e.g.

public static Stream<String> joinGroups(Stream<String> s) {
    Spliterator<String> sp=s.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<String>(sp.estimateSize(), 
        sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
            private StringBuilder sb = new StringBuilder();
            private String last;

            public boolean tryAdvance(Consumer<? super String> action) {
                if(!sp.tryAdvance(str -> last=str))
                    return false;
                while(last!=null) {
                    sb.append(last);
                    if(!sp.tryAdvance(str -> last=str)) break;
                }
                action.accept(sb.toString());
                sb=new StringBuilder();
                return true;
            }
        }, false);
}

which produces the intended groups, as you can test with

joinGroups(Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null))
    .forEach(System.out::println);

but also has the desired lazy behavior, testable via

joinGroups(
    Stream.of("A","B","C",null,null,"D",null,"E","F",null,"G",null)
          .peek(str -> System.out.println("consumed "+str))
).skip(1).filter(s->!s.isEmpty()).findFirst().ifPresent(System.out::println);

After a second thought, I came to this slightly more efficient variant. It will incorporate the StringBuilder only if there are at least two Strings to join, otherwise, it will simply use the already existing sole String instance or the literal "" string for empty groups:

public static Stream<String> joinGroups(Stream<String> s) {
    Spliterator<String> sp=s.spliterator();
    return StreamSupport.stream(
        new Spliterators.AbstractSpliterator<String>(sp.estimateSize(), 
        sp.characteristics()&Spliterator.ORDERED | Spliterator.NONNULL) {
            private String next;

            public boolean tryAdvance(Consumer<? super String> action) {
                if(!sp.tryAdvance(str -> next=str))
                    return false;
                String string=next;
                if(string==null) string="";
                else if(sp.tryAdvance(str -> next=str) && next!=null) {
                    StringBuilder sb=new StringBuilder().append(string);
                    do sb.append(next);while(sp.tryAdvance(str -> next=str) && next!=null);
                    string=sb.toString();
                }
                action.accept(string);
                return true;
            }
        }, false);
}
rajadilipkolli
  • 3,475
  • 2
  • 26
  • 49
Holger
  • 285,553
  • 42
  • 434
  • 765
  • Thank you, it exactly what i looking for, native method instead of tricks. Btw iterator() from input stream instead of spliterator() maybe more comfortable – rustot Nov 01 '16 at 07:33
  • @rustot: It’s tempting to use an `Iterator`, because we are more familiar with it, as it is much older, and in this specific case we might get away with it, but more than often, implementing a `Spliterator` atop a source `Spliterator` is much more straight-forward and potentially more efficient, as we pass a size estimate and characteristics to the result stream. Further, most source streams will be more efficient when traversed via `Spliterator`, which doesn’t have its logic split between a `hasNext` and `next` method… – Holger Nov 01 '16 at 09:13
  • 2 invocations hasNext + next vs 2 invocations tryAdvance + accept and temporary instance creation for lambda But i care not about efficiency, iterator allow look-ahead "is it latest element in stream" and special processing for it – rustot Nov 02 '16 at 08:25
  • @rustot: I’m not considering just the number of invocations. It’s more about how the underlying source has to implement it. See for example `Files.lines`, which is backed by a single method `BufferedReader.readLine`, which will return the next line or `null`, which matches exactly the `tryAdavance` logic. In contrast, an `Iterator` has to poll the next line in `hasNext` and remember it, so that `next` can return it, but it also has to ensure that multiple invocations of `hasNext` without `next` don’t advance and that calling `next` without `hasNext` also does the right thing. – Holger Nov 02 '16 at 08:46
  • In other words, the “look-ahead” of `hasNext` is a disguise here. It’s actually a polling for the next item, just like `tryAdvance`, followed by a storage operation, so the subsequent `next` call can retrieve it. But a simple `if(hasNext()) next()` bears *five* conditionals. First, `next` has to check whether `next` has already been called, then it will poll the next item, check the result and return the condition, then, the caller evaluate the `boolean` result and calls `next`, which will again check, whether `next` has been called and to what result. – Holger Nov 02 '16 at 08:50
  • Even if the stream’s source is an `Iterator`, you will never get that `Iterator` when calling `String.iterator()`. You’ll get an `Iterator` wrapping a `Spliterator`, wrapping the `Iterator`… By the way, if the temporary instances created for the lambda expressions are your concern, it’s easy to change the class to implement `Consumer`, to get rid of the lambda expression completely. Of course, the class can’t be an anonymous class then. – Holger Nov 02 '16 at 09:04
5

It's quite hard to implement such scenarios using standard Stream API. In my free StreamEx library I extended standard Stream interface with methods which allow to perform so-called "partial reduction" which is exactly what is necessary here:

StreamEx<String> str1 = StreamEx.of("A","B","C",null,null,"D",null,"E","F",null,"G",null);
Stream<String> str2 = str1.collapse((a, b) -> a != null,
                          MoreCollectors.filtering(Objects::nonNull, Collectors.joining()));
str2.map(x -> '"'+x+'"').forEach(System.out::println);

Output:

"ABC"
""
"D"
"EF"
"G"

The StreamEx.collapse() method performs a partial reduction of the stream using the supplied collector. The first argument is a predicate which applied to two adjacent original items and should return true if they must be reduced together. Here we just require that first of the pair is not null ((a, b) -> a != null): this means that every group ends with null and new group starts here. Now we need to join group letters together: this can be done by standard Collectors.joining() collector. However we need also to filter out null. We can do it using MoreCollectors.filtering collector (actually the same collector will be available in Java 9 in Collectors class).

This implementation is completely lazy and quite friendly to parallel processing.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334