You can make Stream
s with unknown size, but you have to know when the last element has been reached. This can be done in multiple ways by the user, but for this example I assume the user's implementation of pageResourceFunction
will return null
if the offset is past the last element.
Java 9 and up
Java 9 introduced Stream.takeWhile(Predicate)
, which will short-circuit the Stream
when an element does not match the Predicate
. In this solution, we want to take pages from the pageResourceFunction
while it does not return null
. We apply takeWhile
to an infinite Stream
of offset
s that generate the required pages:
private static final int LIMIT = 4;
public static <T> Stream<T> streamOf(final BiFunction<Integer, Integer, Page<T>> pageResourceFunction) {
return IntStream.iterate(0, i -> i + LIMIT) // Creates an infinite Stream with elements 0, 4, 8, 12, ...
.mapToObj(offset -> pageResourceFunction.apply(offset, LIMIT))
.takeWhile(Objects::nonNull)
.flatMap(page -> page.getElements().stream());
}
Java 8
In Java 8 we have to make our own Iterator
or Spliterator
that takes elements while the pageResourceFunction
does not return null
, then wrap it into a Stream
using StreamSupport
. Reproducing Stream.takeWhile
in Java 8 has been answered before in the questions below, but I will also provide my implementation:
I will use the Spliterator
for this example, since it provides the Stream
with more hints on how to optimize its execution with methods like Spliterator.characteristics()
: The Spliterator
will advance over the Page
s from pageResourceFunction
until the pageResourceFunction
returns null
.
Finally, the Spliterator
is wrapped into a Stream
using StreamSupport.stream(Spliterator, boolean)
. Streams are always lazily-evaluated and are only fully evaluated when a terminal operation is called (for example Stream.collect()
).
import lombok.Value;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import java.util.Collection;
import java.util.List;
import java.util.Spliterator;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class PageProviderJava8 {
private static final int LIMIT = 4;
public static <T> Stream<T> streamOf(final BiFunction<Integer, Integer, Page<T>> pageResourceFunction) {
class PageSpliterator implements Spliterator<Page<T>> {
private int offset = 0;
@Override
public boolean tryAdvance(Consumer<? super Page<T>> action) {
Page<T> page = pageResourceFunction.apply(offset, LIMIT);
if (page == null) {
return false; // Last page is reached, stop the Spliterator
}
offset += LIMIT;
action.accept(page);
return true;
}
@Override
public Spliterator<Page<T>> trySplit() {
return null; // Should return non-null values if you want to use parallel streams
}
@Override
public long estimateSize() {
return Long.MAX_VALUE; // The size is unknown, so we return Long.MAX_VALUE
}
/*
* Must be ordered, otherwise elements are requested from multiple pages,
* which this Spliterator does not support.
* Other characteristics depend on the implementation of Page.elements
*/
@Override
public int characteristics() {
return ORDERED;
}
}
return StreamSupport.stream(new PageSpliterator(), false)
.flatMap(page -> page.getElements().stream());
}
// POJOs
@Value
public static class Page<T> {
Collection<T> elements;
}
@Value
public static class User {
int id;
}
public static class PageProviderJava8Test {
// Creates a Page of Users with ids offset to offset + limit. Will return null after 20 Users.
public static Page<User> users(int offset, int limit) {
if (offset >= 20) {
return null;
}
Collection<User> elements = IntStream.range(offset, offset + limit)
.mapToObj(User::new)
.collect(Collectors.toList());
return new Page<>(elements);
}
@Test
public void testPages() {
List<User> users = PageProviderJava8.streamOf(PageProviderJava8Test::users)
.collect(Collectors.toList());
Assertions.assertThat(users)
.hasSize(20);
Assertions.assertThat(users.get(0).getId()).isEqualTo(0);
Assertions.assertThat(users.get(19).getId()).isEqualTo(19);
}
}
}