3

I have a java stream of undefined length. Now I need to load some meta data from the database and assign it to the streamed data.

I cannot:

  • load all data from the stream to my RAM at once, populate the metadata and then start a new stream as this might use to much RAM.
  • load the metadata for each element individually as this would flood my database with too many requests.

Thus I thought I could load the metadata in partitions from the database.

I need a method like this:

<T> Stream<List<T>> partition(Stream<T> stream, int partitionSize)

so I can use it like this

partition(dataSource.stream(), 1000)
    .map(metadataSource::populate)
    .flatMap(List::stream)
    .forEach(this::doSomething);

I already found Guava's Iteralbes#partition but that would force me to convert the stream to an iterable, partition it and convert it to a stream again. Is there something inbuilt for the stream partitioning or is there an easy way to implement it myself?

ST-DDT
  • 2,615
  • 3
  • 30
  • 51
  • 3
    [Why are you using `peek` to do actual work?](https://stackoverflow.com/a/33636457/1079354) – Makoto May 25 '18 at 15:16
  • @Makoto Good point, I edited it to using `.map(...)` instead. – ST-DDT May 25 '18 at 15:42
  • If I am correct, your problem is a streamed Pagination. Does the combination of [skip](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#skip-long-) and [limit](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#limit-long-) help you? – Shinigami May 28 '18 at 06:19
  • No this problem has nothing to do with pagination, its all about performance optimization. – ST-DDT May 28 '18 at 07:36

1 Answers1

2

I haven't found an existing method that does this already, so I implemented one myself:

public class Partitioner<E> implements Iterator<List<E>> {

    private final Iterator<E> iterator;
    private final int partitionSize;

    public static <T> Stream<List<T>> partition(final Stream<T> stream, final int partitionSize) {
        return new Partitioner<>(stream, partitionSize).asStream();
    }

    public Partitioner(final Stream<E> stream, final int partitionSize) {
        this(stream.iterator(), partitionSize);
    }

    public Partitioner(final Iterator<E> iterator, final int partitionSize) {
        this.iterator = iterator;
        this.partitionSize = partitionSize;
    }

    @Override
    public boolean hasNext() {
        return this.iterator.hasNext();
    }

    @Override
    public List<E> next() {
        if (!hasNext()) {
            throw new NoSuchElementException("No more elements");
        }
        final ArrayList<E> result = new ArrayList<>(this.partitionSize);
        for (int i = 0; i < this.partitionSize && hasNext(); i++) {
            result.add(this.iterator.next());
        }
        return result;
    }

    public Stream<List<E>> asStream() {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(this, Spliterator.NONNULL), false);
    }

}
ST-DDT
  • 2,615
  • 3
  • 30
  • 51