1

i would like to create a method that does some complex operations on a stream (like e.g. replace 7th element, remove last element, remove adjacent duplicates etc) without caching the whole stream.

but what stream api lets me plug this method in? do i have to create my own collector that while collecting emits items to some other stream? but that would change the data-flow direction from pull to push, right?

what's the possible signature of such method?

Stream<T> process(Stream<T> in)

is probably impossible (in a single threaded code) because result could be returned only after collecting the whole input stream

another idea:

void process(Stream<T> in, Stream<T> out)

also seems a bit flawed because java doesn't allow to emit to insert items to existing stream (provided as out parameter).

so how can i do some complex stream processing in java?

piotrek
  • 13,982
  • 13
  • 79
  • 165
  • 1
    Your example require to have a collection not stream - if you don't know the number of elements you can remove the last one, stream can be processed by multiple threads, so again adjacent duplicates is not possible. I think you could somehow do the removal of e.g. 7th element only. – Krzysztof Krasoń Aug 02 '16 at 11:55
  • well, `distinct` somehow removes adjacent duplicates, so apparently it is possible. but i agree that removing last element may be not defined properly – piotrek Aug 02 '16 at 12:04
  • `distinct` is a simple algorithm, it works just like the Linux `uniq` command. All you need to do is keep track of your previously seen value. If the current value is different, record it as your previous value. If it's the same, skip this element and continue on. At most, you are looking at two consecutive elements at any given time. Your requirements make presumptions about the `Stream` that may not be true and you wouldn't be able to find out until you processed the `Stream`s. – nickb Aug 02 '16 at 12:11
  • i think `distinct` removes all the duplicates but it collects the whole stream. removing adjacent elements in single-threaded processing is possible without collecting entire stream. how can i create such function using java8 api? – piotrek Aug 02 '16 at 12:22
  • 1
    To do this properly, you would need to implement a custom Spliterator that would rely on the spliterator of the input Stream, but this is going to be very tricky - the API wasn't really designed for those kinds of operations. – Tunaki Aug 02 '16 at 12:50

4 Answers4

5

The complex operations you use as examples all follow the pattern of an operation on one element in the stream depending on other elements in the stream. Java streams are specifically designed to not allow these types of operations without a collection or reduction. Streams operations do not allow direct access to other members and, in general, non-terminal operations with side-effects are a bad idea.

Note the following from the Stream javadoc:

Collections and streams, while bearing some superficial similarities, have different goals. Collections are primarily concerned with the efficient management of, and access to, their elements. By contrast, streams do not provide a means to directly access or manipulate their elements, and are instead concerned with declaratively describing their source and the computational operations which will be performed in aggregate on that source.

More specifically:

Most stream operations accept parameters that describe user-specified behavior ... To preserve correct behavior, these behavioral parameters:

must be non-interfering (they do not modify the stream source); and in most cases must be stateless (their result should not depend on any state that might change during execution of the stream pipeline).

and

Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful. A stateful lambda (or other object implementing the appropriate functional interface) is one whose result depends on any state which might change during the execution of the stream pipeline

All the complexities of itermediate and terminal stateless and stateful operations are well described at https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html and http://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html

This approach has both advantages and disadvantages. A significant advantage is that it allows parallel processing of streams. A significant disadvantage is that operations that are easy in some other languages (such as skipping every third element in the stream) are difficult in Java.

Note that you will see a lot of code (including accepted answers on SO) that ignore the advice that behavioural parameters of stream operations should be stateless. To work, this code relies on behaviour of an implementation of Java that is not defined by the language specification: namely, that streams are processed in order. There is nothing in the specification stopping an implementation of Java processing elements in reverse order or random order. Such an implementation would make any stateful stream operations immediately behave differently. Stateless operations would continue to behave exactly the same. So, to summarise, stateful operations rely on details of the implementation of Java rather than the specification.

Also note that it is possible to have safe stateful intermediate operations. They need to be designed so that they specifically do not rely on the order in which elements are processed. Stream.distinct and Stream.sorted are good examples of this. They need to maintain state to work, but they are designed to work irrespective of the order in which elements are processed.

So to answer your question, these types of operations are possible to do in Java but they are not simple, safe (for the reason given in the previous paragraph) or a natural fit for the language design. I suggest using reduction or collection or (see Tagir Valeev's answer) a spliterator to create a new stream. Alternatively use traditional iteration.

sprinter
  • 27,148
  • 6
  • 47
  • 78
  • *The complex operations you use as examples all follow the pattern of an operation on one element in the stream depending on other elements in the stream.* "Replacing" an element could be implemented as a stateless map operation. *Java streams are specifically designed to not allow these types of operations without a collection or reduction.* The standard stream API provides stateful operations such as `distinct()` and `sorted()`, which by definition rely on comparing one element to another. – shmosel Aug 04 '16 at 23:16
  • @shmosel I simplified my answer to avoid rehashing all the details in https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html. You are correct that stateful intermediate operations exit. As noted in that doc, stateful operations (e.g sorted) may need to collect the entire stream to work. That's what I meant by 'without colleciton or reduction'. I'll edit my answer to make that a bit clearer. – sprinter Aug 04 '16 at 23:26
1

You could just call and return any of the standard stream operations, such as filter, map, reduce, etc., and have them perform some complex operation, e.g. one that required external data. For example, filterAdjacentDuplicates and replaceNthElement could be implemented like this:

public static <T> Stream<T> filterAdjacentDupes(Stream<T> stream) {
    AtomicReference<T> last = new AtomicReference<>();
    return stream.filter(t -> ! t.equals(last.getAndSet(t)));
}

public static <T> Stream<T> replaceNthElement(Stream<T> stream, int n, T repl) {
    AtomicInteger count = new AtomicInteger();
    return stream.map(t -> count.incrementAndGet() == n ? repl : t);
}

Example usage:

List<String> lst = Arrays.asList("foo", "bar", "bar", "bar", "blub", "foo");
replaceNthElement(filterAdjacentDupes(lst.stream()), 3, "BAR").forEach(System.out::println);
// Output: foo bar BAR foo

However, as noted in comments this is not really how the Stream API is supposed to be used. In particular, operations such as these two will fail when given parallel streams.

tobias_k
  • 81,265
  • 12
  • 120
  • 179
  • 2
    Note that both `filterAdjacentDupes` and `replaceNthElement` are broken when the input Stream is parallel. – Tunaki Aug 02 '16 at 12:44
  • 2
    Note that the stream documentation advises against stateful non-terminal operations. It's possible to ignore that advice (as in this answer) but the outcomes are not defined by the language. So there is no guarantee this will work on all implementations or in future version. – sprinter Aug 02 '16 at 12:52
  • I agree that this is probably not perfect, but it might be adequate for OP's requirements. Thanks for the warnings, though. – tobias_k Aug 02 '16 at 12:54
  • @Tunaki Are you sure, it would work this way in case of `map`, not `parallelMap`? – Dmitry Ginzburg Aug 02 '16 at 12:54
  • 1
    @DmitryGinzburg `parallelMap` doesn't exist? But yes, run the code in the answer with `lst.stream().parallel()` given as parameter and you can start having very weird results. – Tunaki Aug 02 '16 at 12:58
  • @Tunaki Oops, you're right. Dunno why wrote like this – Dmitry Ginzburg Aug 02 '16 at 12:59
  • Strange to use atomic wrappers in light of @Tunaki's observation. – shmosel Aug 04 '16 at 23:24
1

The correct (though not very easy) way to do this is to write your own Spliterator. The common algorithm is the following:

  1. Take the existing stream Spliterator using stream.spliterator()
  2. Write your own Spliterator which may consume elements of existing one when advancing probably doing some additional operations.
  3. Create a new stream based on your spliterator via StreamSupport.stream(spliterator, stream.isParallel())
  4. Delegate close() call to original stream like .onClose(stream::close).

Writing good spliterator which parallelizes well is often very non-trivial task. However if you don't care about parallelization you may subclass AbstractSpliterator which is simpler. Here's an example how to write a new Stream operation which removes an element at given position:

public static <T> Stream<T> removeAt(Stream<T> src, int idx) {
    Spliterator<T> spltr = src.spliterator();
    Spliterator<T> res = new AbstractSpliterator<T>(Math.max(0, spltr.estimateSize()-1), 
            spltr.characteristics()) {
        long cnt = 0;

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if(cnt++ == idx && !spltr.tryAdvance(x -> {}))
                return false;
            return spltr.tryAdvance(action);
        }
    };
    return StreamSupport.stream(res, src.isParallel()).onClose(src::close);
}

This is minimal implementation and it can be improved to show better performance and parallelism.

In my StreamEx library I tried to simplify the addition of such custom stream operations via headTail. Here's how to do the same using StreamEx:

public static <T> StreamEx<T> removeAt(StreamEx<T> src, int idx) {
    // head is the first stream element
    // tail is the stream of the rest elements
    // want to remove first element? ok, just remove tail
    // otherwise call itself with decremented idx and prepend the head element to the result
    return src.headTail(
       (head, tail) -> idx == 0 ? tail : removeAt(tail, idx-1).prepend(head));
}

You can even support chaining with chain() method:

public static <T> Function<StreamEx<T>, StreamEx<T>> removeAt(int idx) {
    return s -> removeAt(s, idx);
}

Usage example:

StreamEx.of("Java 8", "Stream", "API", "is", "not", "great")
        .chain(removeAt(4)).forEach(System.out::println);

Finally note that even without headTail there are some ways to solve your problems using StreamEx. To remove at specific index you may zip with increasing numbers, then filter and drop indexes like this:

StreamEx.of(stream)
        .zipWith(IntStreamEx.ints().boxed())
        .removeValues(pos -> pos == idx)
        .keys();

To collapse adjacent repeats there's dedicated collapse method (it even parallelizes quite well!):

StreamEx.of(stream).collapse(Object::equals);
Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
0

Building on tobias_k answer and ideas expressed in this question/update 2 we may just return proper Predicate and Map functions which capture their local variables. (These functions as a result are stateful, which is not ideal for streams, but distinct() method in streams API is probably stateful too).

Here is the modified code:

public class Foo {
    public static void run() {
        List<String> lst = Arrays.asList("foo", "bar", "bar", "bar", "blub", "foo");
        lst.stream()
                .filter(Foo.filterAdjacentDupes())
                .map(Foo.replaceNthElement(3, "BAR"))
                .forEach(System.out::println);
        // Output: foo bar BAR foo
    }

    public static <T> Predicate<T> filterAdjacentDupes() {
        final AtomicReference<T> last = new AtomicReference<>();
        return t -> ! t.equals(last.getAndSet(t));
    }

    public static <T> UnaryOperator<T> replaceNthElement(int n, T repl) {
        final AtomicInteger count = new AtomicInteger();
        return t -> count.incrementAndGet() == n ? repl : t;
    }
}
mp31415
  • 6,531
  • 1
  • 44
  • 34