54

In Java, one can easily generate an infinite stream with Stream.generate(supplier). However, I would need to generate a stream that will eventually finish.

Imagine, for example, I want a stream of all files in a directory. The number of files can be huge, therefore I can not gather all the data upfront and create a stream from them (via collection.stream()). I need to generate the sequence piece by piece. But the stream will obviously finish at some point, and terminal operators like (collect() or findAny()) need to work on it, so Stream.generate(supplier) is not suitable here.

Is there any reasonable easy way to do this in Java, without implementing the entire Stream interface on my own?

I can think of a simple hack - doing it with infinite Stream.generate(supplier), and providing null or throwing an exception when all the actual values are taken. But it would break the standard stream operators, I could use it only with my own operators that are aware of this behaviour.

CLARIFICATION

People in the comments are proposing me takeWhile() operator. This is not what I meant. How to phrase the question better... I am not asking how to filter (or limit) an existing stream, I am asking how to create (generate) the stream - dynamically, without loading all the elements upfront, but the stream would have a finite size (unknown in advance).

SOLUTION

The code I was looking for is

    Iterator it = myCustomIteratorThatGeneratesTheSequence();
    StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, Spliterator.DISTINCT), false);

I just looked into java.nio.file.Files, how the list(path) method is implemented.

Jan X Marek
  • 2,464
  • 2
  • 18
  • 26
  • I don't think I understand. Are you looking for some sort of `takeWhile` like here http://stackoverflow.com/q/20746429/1743880? – Tunaki Mar 16 '16 at 22:11
  • 1
    Have you taken a look at methods like [IntStream.range](https://docs.oracle.com/javase/8/docs/api/java/util/stream/IntStream.html#range-int-int-) and friends? – Javier Martín Mar 16 '16 at 22:13
  • 1
    `openjdk 9` provides `takeWhile()` – Andrew Tobilko Mar 16 '16 at 22:13
  • No, not really. I will think how to rephrase my question. It's not about implementing a stream filter. It's how to generate the original stream itself. – Jan X Marek Mar 16 '16 at 22:14
  • creating Lot's go back to my example with a stream returning all files in a given directory. You can not read all the files upfront, and then create the stream with collectionOfFiles.stream(), because there could be millions of them, it could run out fo memory, its. It is necessary to read the list of files bit by bit, and feed them into the stream gradually, as they are consumed from the other end. Just like with the Stream.generate() method, only the stream would not be infinite. – Jan X Marek Mar 16 '16 at 22:18
  • 3
    @JanXMarek One way would be to create an `Iterator` by using this answer http://stackoverflow.com/a/17959135/3973077 and then convert the `Iterator` to a `Stream` using this one http://stackoverflow.com/a/24511534/3973077. This way the iterator would only hold a small stack of files in memory and the stream will be lazily evaluated. It's a lot of work though. – Paul Boddington Mar 16 '16 at 22:22
  • @PaulBoddington Thanks, yes, that's what I meant. Pity you did not reply via reply, you would have earned 25 rep :-) My stream will actually not iterate through files, it was just a simplified example from a domain that is familiar to everyone. – Jan X Marek Mar 16 '16 at 22:41
  • @JanXMarek Oh well. There's more to life than rep on SO! This is a really good question, so I don't get why it's downvoted. In Python you could combine the `yield` keyword with recursion to do it in a few lines. With Java it's a lot more complicated sadly. – Paul Boddington Mar 16 '16 at 22:44
  • 5
    I strongly recommend to first check whether the logic of a `Spliterator` fits better, before going the lengths of implementing a more complicated `Iterator` to wrap it in a `Spliterator` afterwards. See [this answer](http://stackoverflow.com/a/35228848/2711488) for an example… – Holger Mar 17 '16 at 16:33
  • Feel free to create an answer for your own question... – rogerdpack Jan 02 '18 at 19:36

4 Answers4

24

Is there any reasonable easy way to do this in Java, without implementing the entire Stream interface on my own?

A simple .limit() guarantees that it will terminate. But that's not always powerful enough.

After the Stream factory methods the simplest approach for creating customs stream sources without reimplementing the stream processing pipeline is subclassing java.util.Spliterators.AbstractSpliterator<T> and passing it to java.util.stream.StreamSupport.stream(Supplier<? extends Spliterator<T>>, int, boolean)

If you're intending to use parallel streams note that AbstractSpliterator only yields suboptimal splitting. If you have more control over your source fully implementing the Spliterator interface can better.

For example, the following snippet would create a Stream providing an infinite sequence 1,2,3...

in that particular example you could use IntStream.range()

But the stream will obviously finish at some point, and terminal operators like (collect() or findAny()) need to work on it.

short-circuiting operations like findAny() can actually finish on an infinite stream, as long as there is any element that matches.

Java 9 introduces Stream.iterate to generate finite streams for some simple cases.

the8472
  • 40,999
  • 5
  • 70
  • 122
0

Kotlin code to create Stream of JsonNode from InputStream


    private fun InputStream.toJsonNodeStream(): Stream<JsonNode> {
        return StreamSupport.stream(
                Spliterators.spliteratorUnknownSize(this.toJsonNodeIterator(), Spliterator.ORDERED),
                false
        )
    }

    private fun InputStream.toJsonNodeIterator(): Iterator<JsonNode> {
        val jsonParser = objectMapper.factory.createParser(this)

        return object: Iterator<JsonNode> {

            override fun hasNext(): Boolean {
                var token = jsonParser.nextToken()
                while (token != null) {
                    if (token == JsonToken.START_OBJECT) {
                        return true
                    }
                    token = jsonParser.nextToken()
                }
                return false
            }

            override fun next(): JsonNode {
                return jsonParser.readValueAsTree()
            }
        }
    }

Alex
  • 27
  • 1
  • 3
0

Here is a stream which is custom and finite :

package org.tom.stream;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

public class GoldenStreams {
private static final String IDENTITY = "";

public static void main(String[] args) {
    Stream<String> stream = java.util.stream.StreamSupport.stream(new Spliterator<String>() {
        private static final int LIMIT = 25;
        private int integer = Integer.MAX_VALUE;
        {
            integer = 0;
        }
        @Override
        public int characteristics() {
            return Spliterator.DISTINCT;
        }
        @Override
        public long estimateSize() {
            return LIMIT-integer;
        }
        @Override
        public boolean tryAdvance(Consumer<? super String> arg0) {
            arg0.accept(IDENTITY+integer++);
            return integer < 25;
        }
        @Override
        public Spliterator<String> trySplit() {
            System.out.println("trySplit");
            return null;
        }}, false);
    List<String> peeks = new ArrayList<String>();
    List<String> reds = new ArrayList<String>();
    stream.peek(data->{
        peeks.add(data);
    }).filter(data-> {
        return Integer.parseInt(data)%2>0;
    }).peek(data ->{
        System.out.println("peekDeux:"+data);
    }).reduce(IDENTITY,(accumulation,input)->{
        reds.add(input);
        String concat = accumulation + ( accumulation.isEmpty() ? IDENTITY : ":") + input;
        System.out.println("reduce:"+concat);
        return concat;
    });
    System.out.println("Peeks:"+peeks.toString());
    System.out.println("Reduction:"+reds.toString());
}
}
tomk
  • 31
  • 5
0

While the author has discarded the takeWhile option, I find it adequate for certain use cases and worth an explanation.

The method takeWhile can be used on any stream and will terminate the stream when the predicate provided to the method returns false. The object which results in a false is not appended to the stream; only the objects which resulted in true are passed downstream.

So one method for generating a finite stream could be to use the Stream.generate method and return a value which signals the end of the stream by being evaluated to false by the predicate provided to takeWhile.

Here's an example, generating all the permutations of an array :

public static Stream<int[]> permutations(int[] original) {
    int dim = original.length;

    var permutation = original.clone();
    int[] controller = new int[dim];
    var low = new AtomicInteger(0);
    var up = new AtomicInteger(1);

    var permutationsStream = Stream.generate(() -> {
        while (up.get() < dim) {
            if (controller[up.get()] < up.get()) {
                low.set(up.get() % 2 * controller[up.get()]);

                var tmp = permutation[low.get()];
                permutation[low.get()] = permutation[up.get()];
                permutation[up.get()] = tmp;

                controller[up.get()]++;
                up.set(1);

                return permutation.clone();
            } else {
                controller[up.get()] = 0;
                up.incrementAndGet();
            }
        }

        return null;
    }).takeWhile(Objects::nonNull);

    return Stream.concat(
            Stream.ofNullable(original.clone()),
            permutationsStream
    );
}

In this example, I used the null value to signal the end of the stream. The caller of the method won't receive the null value !

OP could use a similar strategy, and combine it with a visitor pattern.

If it's a flat directory, OP would be better off using Stream.iterate with the seed being the index of the file to yield and Stream.limit on the number of files (which can be known without browsing the directory).

Adrien H
  • 643
  • 6
  • 21