32

What is the recommended way to transform a stream into a sliding window?

For instance, in Ruby you could use each_cons:

irb(main):020:0> [1,2,3,4].each_cons(2) { |x| puts x.inspect }
[1, 2]
[2, 3]
[3, 4]
=> nil
irb(main):021:0> [1,2,3,4].each_cons(3) { |x| puts x.inspect }
[1, 2, 3]
[2, 3, 4]
=> nil

In Guava, I found only Iterators#partition, which is related but no sliding window:

final Iterator<List<Integer>> partition =
   Iterators.partition(IntStream.range(1, 5).iterator(), 3);
partition.forEachRemaining(System.out::println);
-->
[1, 2, 3]
[4]
Philipp Claßen
  • 41,306
  • 31
  • 146
  • 239
  • 1
    Not all the answers speak only about pairs, but feel free to re-open if you wish. I still link it here: http://stackoverflow.com/questions/20470010/collect-successive-pairs-from-a-stream – Alexis C. Dec 08 '15 at 14:55

7 Answers7

35

There's no such function in the API as it supports both sequential and parallel processing and it's really hard to provide an efficient parallel processing for sliding window function for arbitrary stream source (even efficient pairs parallel processing is quite hard, I implemented it, so I know).

However if your source is the List with fast random access, you can use subList() method to get the desired behavior like this:

public static <T> Stream<List<T>> sliding(List<T> list, int size) {
    if(size > list.size()) 
        return Stream.empty();
    return IntStream.range(0, list.size()-size+1)
                    .mapToObj(start -> list.subList(start, start+size));
}

Similar method is actually available in my StreamEx library: see StreamEx.ofSubLists().

There are also some other third-party solutions which don't care about parallel processing and provide sliding functionality using some internal buffer. For example, protonpack StreamUtils.windowed.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
  • Shouldn't it be range(0, list.size() / size + 1) and then make the sublists with (start * size, start * size + size) taking care of the last page not going beyond the list.size() – Triqui Oct 26 '17 at 15:11
  • 1
    @Triqui, see the question, it's for overlapping windows. What do you want is non-overlapping batches which is answered separately somewhere... – Tagir Valeev Oct 29 '17 at 12:11
14

If you're willing to use a third party library and don't need parallelism, then jOOλ offers SQL-style window functions as follows

int n = 2;

System.out.println(
Seq.of(1, 2, 3, 4)
   .window(0, n - 1)
   .filter(w -> w.count() == n)
   .map(w -> w.window().toList())
   .toList()
);

yielding

[[1, 2], [2, 3], [3, 4]]

And

int n = 3;

System.out.println(
Seq.of(1, 2, 3, 4)
   .window(0, n - 1)
   .filter(w -> w.count() == n)
   .map(w -> w.window().toList())
   .toList()
);

yielding

[[1, 2, 3], [2, 3, 4]]

Here's a blog post about how this works.

Disclaimer: I work for the company behind jOOλ

Lukas Eder
  • 211,314
  • 129
  • 689
  • 1,509
  • Does it support parallel processing? – Normal Nov 30 '16 at 10:39
  • 1
    @Normal: No. By *"and don't need parallelism"*, I mean that jOOλ is explicitly for sequential-only processing (which is mostly good enough) – Lukas Eder Dec 01 '16 at 20:57
  • For some reason the `window` clause is no longer in the wiki documentation @LukasEder – Ashwin Jayaprakash Aug 10 '18 at 04:26
  • @AshwinJayaprakash: What wiki documentation? – Lukas Eder Aug 10 '18 at 07:16
  • @LukasEder I meant the https://github.com/jOOQ/jOOL/blob/master/README.md file does not mention the `window` function. I had to read the test code to find out more: https://github.com/jOOQ/jOOL/blob/db26c4bb9513348e7512f346130553d4e1dd3c6f/jOOL-java-8/src/test/java/org/jooq/lambda/SeqTest.java – Ashwin Jayaprakash Sep 09 '18 at 22:05
  • @AshwinJayaprakash: I see, thanks for pointing this out. Will add some examples. For the time being, you can refer to this blog post: https://blog.jooq.org/2016/01/06/2016-will-be-the-year-remembered-as-when-java-finally-had-window-functions/ – Lukas Eder Sep 10 '18 at 07:10
8

Another option cyclops-react builds on top of jOOλ's Seq interface (and JDK 8 Stream), but simple-react builds concurrency / parallelism back in (if that is important to you - by creating Streams of Futures).

You can use Lukas's powerful windowing functions with either library (as we extend the awesome jOOλ) , but there is also a sliding operator, that I think simplifies things in this case and is suitable for use in infinite streams (i.e. it doesn't consume the stream, but buffers values as they flow through).

With ReactiveSeq it would look something like this -

ReactiveSeq.of(1, 2, 3, 4)
           .sliding(2)
           .forEach(System.out::println);

With LazyFutureStream could look something like the example below -

 LazyFutureStream.iterate(1,i->i+1)
                 .sliding(3,2) //lists of 3, increment 2
                 .forEach(System.out::println);

Equivalant static methods for creating a sliding view over java.util.stream.Stream are also provided in cyclops-streams StreamUtils class.

       StreamUtils.sliding(Stream.of(1,2,3,4),2)
                  .map(Pair::new);

If you would like to work directly with each sliding view, you can make use of the slidingT operator which returns a List Transformer. For example to add a number to each element in each sliding view, then reduce each sliding window to the sum of it's elements we can do :-

        ReactiveSeq<Integer> windowsSummed = ReactiveSeq.fromIterable(data)
                                                        .slidingT(3)
                                                        .map(a->a+toAdd)
                                                        .reduce(0,(a,b)->a+b)
                                                        .stream();

Disclaimer: I work for the company behind cyclops-react

John McClean
  • 5,225
  • 1
  • 22
  • 30
5

if you want to bring the whole power of Scala's persistent collections to Java, you may use the library Vavr, formerly called Javaslang.

// this imports List, Stream, Iterator, ...
import io.vavr.collection.*;

Iterator.range(1, 5).sliding(3)
        .forEach(System.out::println);
// --->
// List(1, 2, 3)
// List(2, 3, 4)

Iterator.range(1, 5).sliding(2, 3)
        .forEach(System.out::println);
// --->
// List(1, 2)
// List(4)

Iterator.ofAll(javaStream).sliding(3);

You may not only use Iterator, this also works for almost any other Vavr collection: Array, Vector, List, Stream, Queue, HashSet, LinkedHashSet, TreeSet, ...

Overview Javaslang 2.1.0-alpha

(Overview Javaslang 2.1.0-alpha)

Disclaimer: I'm the creator of Vavr, formerly called Javaslang.

Danilo Piazzalunga
  • 7,590
  • 5
  • 49
  • 75
Daniel Dietrich
  • 2,262
  • 20
  • 25
3

I found solution on Tomek's Nurkiewicz blog (https://www.nurkiewicz.com/2014/07/grouping-sampling-and-batching-custom.html). Below SlidingCollector which you can use:

public class SlidingCollector<T> implements Collector<T, List<List<T>>, List<List<T>>> {

    private final int size;
    private final int step;
    private final int window;
    private final Queue<T> buffer = new ArrayDeque<>();
    private int totalIn = 0;

    public SlidingCollector(int size, int step) {
        this.size = size;
        this.step = step;
        this.window = max(size, step);
    }

    @Override
    public Supplier<List<List<T>>> supplier() {
        return ArrayList::new;
    }

    @Override
    public BiConsumer<List<List<T>>, T> accumulator() {
        return (lists, t) -> {
            buffer.offer(t);
            ++totalIn;
            if (buffer.size() == window) {
                dumpCurrent(lists);
                shiftBy(step);
            }
        };
    }

    @Override
    public Function<List<List<T>>, List<List<T>>> finisher() {
        return lists -> {
            if (!buffer.isEmpty()) {
                final int totalOut = estimateTotalOut();
                if (totalOut > lists.size()) {
                    dumpCurrent(lists);
                }
            }
            return lists;
        };
    }

    private int estimateTotalOut() {
        return max(0, (totalIn + step - size - 1) / step) + 1;
    }

    private void dumpCurrent(List<List<T>> lists) {
        final List<T> batch = buffer.stream().limit(size).collect(toList());
        lists.add(batch);
    }

    private void shiftBy(int by) {
        for (int i = 0; i < by; i++) {
            buffer.remove();
        }
    }

    @Override
    public BinaryOperator<List<List<T>>> combiner() {
        return (l1, l2) -> {
            throw new UnsupportedOperationException("Combining not possible");
        };
    }

    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.noneOf(Characteristics.class);
    }

}

Below some example by Tomekin Spock (I hope it is readable):

import static com.nurkiewicz.CustomCollectors.sliding

@Unroll
class CustomCollectorsSpec extends Specification {

    def "Sliding window of #input with size #size and step of 1 is #output"() {
        expect:
        input.stream().collect(sliding(size)) == output

        where:
        input  | size | output
        []     | 5    | []
        [1]    | 1    | [[1]]
        [1, 2] | 1    | [[1], [2]]
        [1, 2] | 2    | [[1, 2]]
        [1, 2] | 3    | [[1, 2]]
        1..3   | 3    | [[1, 2, 3]]
        1..4   | 2    | [[1, 2], [2, 3], [3, 4]]
        1..4   | 3    | [[1, 2, 3], [2, 3, 4]]
        1..7   | 3    | [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7]]
        1..7   | 6    | [1..6, 2..7]
    }

    def "Sliding window of #input with size #size and no overlapping is #output"() {
        expect:
        input.stream().collect(sliding(size, size)) == output

        where:
        input | size | output
        []    | 5    | []
        1..3  | 2    | [[1, 2], [3]]
        1..4  | 4    | [1..4]
        1..4  | 5    | [1..4]
        1..7  | 3    | [1..3, 4..6, [7]]
        1..6  | 2    | [[1, 2], [3, 4], [5, 6]]
    }

    def "Sliding window of #input with size #size and some overlapping is #output"() {
        expect:
        input.stream().collect(sliding(size, 2)) == output

        where:
        input | size | output
        []    | 5    | []
        1..4  | 5    | [[1, 2, 3, 4]]
        1..7  | 3    | [1..3, 3..5, 5..7]
        1..6  | 4    | [1..4, 3..6]
        1..9  | 4    | [1..4, 3..6, 5..8, 7..9]
        1..10 | 4    | [1..4, 3..6, 5..8, 7..10]
        1..11 | 4    | [1..4, 3..6, 5..8, 7..10, 9..11]
    }

    def "Sliding window of #input with size #size and gap of #gap is #output"() {
        expect:
        input.stream().collect(sliding(size, size + gap)) == output

        where:
        input | size | gap | output
        []    | 5    | 1   | []
        1..9  | 4    | 2   | [1..4, 7..9]
        1..10 | 4    | 2   | [1..4, 7..10]
        1..11 | 4    | 2   | [1..4, 7..10]
        1..12 | 4    | 2   | [1..4, 7..10]
        1..13 | 4    | 2   | [1..4, 7..10, [13]]
        1..13 | 5    | 1   | [1..5, 7..11, [13]]
        1..12 | 5    | 3   | [1..5, 9..12]
        1..13 | 5    | 3   | [1..5, 9..13]
    }

    def "Sampling #input taking every #nth th element is #output"() {
        expect:
        input.stream().collect(sliding(1, nth)) == output

        where:
        input  | nth | output
        []     | 1   | []
        []     | 5   | []
        1..3   | 5   | [[1]]
        1..6   | 2   | [[1], [3], [5]]
        1..10  | 5   | [[1], [6]]
        1..100 | 30  | [[1], [31], [61], [91]]
    }
}
tmucha
  • 689
  • 1
  • 4
  • 19
1

Another option would be to implement a custom Spliterator just like it was done here:

import java.util.*;

public class SlidingWindowSpliterator<T> implements Spliterator<Stream<T>> {

    static <T> Stream<Stream<T>> windowed(Collection<T> stream, int windowSize) {
        return StreamSupport.stream(
          new SlidingWindowSpliterator<>(stream, windowSize), false);
    }

    private final Queue<T> buffer;
    private final Iterator<T> sourceIterator;
    private final int windowSize;
    private final int size;

    private SlidingWindowSpliterator(Collection<T> source, int windowSize) {
        this.buffer = new ArrayDeque<>(windowSize);
        this.sourceIterator = Objects.requireNonNull(source).iterator();
        this.windowSize = windowSize;
        this.size = calculateSize(source, windowSize);
    }

    @Override
    public boolean tryAdvance(Consumer<? super Stream<T>> action) {
        if (windowSize < 1) {
            return false;
        }

        while (sourceIterator.hasNext()) {
            buffer.add(sourceIterator.next());

            if (buffer.size() == windowSize) {
                action.accept(Arrays.stream((T[]) buffer.toArray(new Object[0])));
                buffer.poll();
                return sourceIterator.hasNext();
            }
        }

        return false;
    }

    @Override
    public Spliterator<Stream<T>> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
       return size;
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL | SIZED;
    }

    private static int calculateSize(Collection<?> source, int windowSize) {
        return source.size() < windowSize
          ? 0
          : source.size() - windowSize + 1;
    }
}
Grzegorz Piwowarek
  • 13,172
  • 8
  • 62
  • 93
0

Not really sure if this is "safe" or "good" but maybe you guys let me know.

public static <T> Stream<List<T>> sliding(Stream<T> stream, int window) {
    Queue<T> queue = new LinkedList<>();
    return stream.dropWhile(item -> {
        if (queue.size() < window - 1) {
            queue.add(item);
            return true;
        }
        return false;
    }).map(item -> {
        queue.add(item);
        List<T> ret = queue.stream().toList();
        queue.remove();
        return ret;
    });
}

public static void main(String[] args) {
    sliding(Stream.of(1, 2, 3, 4, 5, 6, 7), 3).forEach(x -> System.out.println(x));
}
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
William Papsco
  • 225
  • 1
  • 7