0

Here is an example:

There are several services that return set of items as a page:

public Page<User> users(int offset, int limit);
public Page<Group> groups(int offset, int limit);

Consumers of such services implement the logic to handle the pagination on their side. It is not complicated, but still error-prone and time-consuming handling.

I try to wrap this logic inside:

public <T> Stream<T> streamOf(final BiFunction<Integer, Integer, PageResource<T>> pageResourceFunction);

The pageResourceFunction accepts offset and limit as input parameters, returns the page of elements.

A consumer should get a stream as:

Stream<User> users = streamOf(service::users);

The real users are not loaded yet. They should be loaded only, when the consumer starts iteration via stream.

  1. Is it possible in Java? To construct a stream lazily (to return it from streamOf) with the number of streams/elements that is not known in advance?
  2. If the 1st opiton is not possible, I can simplify a task a bit. Quite often Page contains the total number of elements, so I can in the streamOf get the first page, identify the total number of elements and calculate how many pages exist. But even this simplificaton does not help me to implement streamOf
Alexandr
  • 9,213
  • 12
  • 62
  • 102

2 Answers2

2

You can make Streams with unknown size, but you have to know when the last element has been reached. This can be done in multiple ways by the user, but for this example I assume the user's implementation of pageResourceFunction will return null if the offset is past the last element.

Java 9 and up

Java 9 introduced Stream.takeWhile(Predicate), which will short-circuit the Stream when an element does not match the Predicate. In this solution, we want to take pages from the pageResourceFunction while it does not return null. We apply takeWhile to an infinite Stream of offsets that generate the required pages:

private static final int LIMIT = 4;

public static <T> Stream<T> streamOf(final BiFunction<Integer, Integer, Page<T>> pageResourceFunction) {
    return IntStream.iterate(0, i -> i + LIMIT)  // Creates an infinite Stream with elements 0, 4, 8, 12, ...
            .mapToObj(offset -> pageResourceFunction.apply(offset, LIMIT))
            .takeWhile(Objects::nonNull)
            .flatMap(page -> page.getElements().stream());
}

Java 8

In Java 8 we have to make our own Iterator or Spliterator that takes elements while the pageResourceFunction does not return null, then wrap it into a Stream using StreamSupport. Reproducing Stream.takeWhile in Java 8 has been answered before in the questions below, but I will also provide my implementation:

I will use the Spliterator for this example, since it provides the Stream with more hints on how to optimize its execution with methods like Spliterator.characteristics(): The Spliterator will advance over the Pages from pageResourceFunction until the pageResourceFunction returns null.

Finally, the Spliterator is wrapped into a Stream using StreamSupport.stream(Spliterator, boolean). Streams are always lazily-evaluated and are only fully evaluated when a terminal operation is called (for example Stream.collect()).

import lombok.Value;
import org.assertj.core.api.Assertions;
import org.junit.Test;

import java.util.Collection;
import java.util.List;
import java.util.Spliterator;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class PageProviderJava8 {

    private static final int LIMIT = 4;

    public static <T> Stream<T> streamOf(final BiFunction<Integer, Integer, Page<T>> pageResourceFunction) {
        class PageSpliterator implements Spliterator<Page<T>> {

            private int offset = 0;

            @Override
            public boolean tryAdvance(Consumer<? super Page<T>> action) {
                Page<T> page = pageResourceFunction.apply(offset, LIMIT);
                if (page == null) {
                    return false; // Last page is reached, stop the Spliterator
                }
                offset += LIMIT;
                action.accept(page);
                return true;
            }

            @Override
            public Spliterator<Page<T>> trySplit() {
                return null; // Should return non-null values if you want to use parallel streams
            }

            @Override
            public long estimateSize() {
                return Long.MAX_VALUE; // The size is unknown, so we return Long.MAX_VALUE
            }

            /*
             * Must be ordered, otherwise elements are requested from multiple pages,
             * which this Spliterator does not support.
             * Other characteristics depend on the implementation of Page.elements
             */
            @Override
            public int characteristics() {
                return ORDERED;
            }
        }

        return StreamSupport.stream(new PageSpliterator(), false)
                .flatMap(page -> page.getElements().stream());
    }

    // POJOs
    @Value
    public static class Page<T> {
        Collection<T> elements;
    }

    @Value
    public static class User {
        int id;
    }

    public static class PageProviderJava8Test {

        // Creates a Page of Users with ids offset to offset + limit. Will return null after 20 Users.
        public static Page<User> users(int offset, int limit) {
            if (offset >= 20) {
                return null;
            }
            Collection<User> elements = IntStream.range(offset, offset + limit)
                    .mapToObj(User::new)
                    .collect(Collectors.toList());
            return new Page<>(elements);
        }

        @Test
        public void testPages() {
            List<User> users = PageProviderJava8.streamOf(PageProviderJava8Test::users)
                    .collect(Collectors.toList());
            Assertions.assertThat(users)
                    .hasSize(20);
            Assertions.assertThat(users.get(0).getId()).isEqualTo(0);
            Assertions.assertThat(users.get(19).getId()).isEqualTo(19);
        }
    }
}
Patrick Hooijer
  • 658
  • 4
  • 7
0

Here is a solution for option #2, for the 1st option, could not find one.

Hope it will be usefull for someone.

  private static final int LIMIT = 5;

  public <T> Stream<T> streamOf(final BiFunction<Integer, Integer, PageResource<T>> pageResourceFunction) {

    PageResource<T> firstPage = pageResourceFunction.apply(0, LIMIT);

    return Stream.concat(
      firstPage.getContent().stream(),
      IntStream.range(1, numberOfPages(firstPage.getTotalElements()))
      .mapToObj(numberOfPage ->
        pageResourceFunction.apply(
          numberOfPage * LIMIT, LIMIT))
      .map(PageResource::items)
      .flatMap(List::stream));
  }

  private int numberOfPages(final long total) {
    return (total % LIMIT == 0)
      ? (int) total / LIMIT
      : (int) total / LIMIT + 1;
  }

Assume the original page resource function accepts additional arguments:

  PageResource<Node> subNodes(
    final int parentId,
    final int someAdditionalId,
    final int offset,
    final int limit);

Consumer:

Stream<Node> stream = streamOf((offset, limit) -> 
  service.subNodes(111, 222, offset, limit));
Alexandr
  • 9,213
  • 12
  • 62
  • 102