3

I have a Stream of MyObject that I want batch persist into DB (not one by one but let's say 1000 at once). So I want to do a transformation, something like

Stream<MyObject> ---> Stream<List<MyObject>>

where each List has some fixed size batchSize. Is there a way to do that with standard Java 8 Stream API?

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
Andrii Karaivanskyi
  • 1,942
  • 3
  • 19
  • 23
  • If your source is a `List` with fast random access, you can use also [this solution](http://stackoverflow.com/a/30072617/4856258) to get the stream (note that it also parallelizes nicely). – Tagir Valeev Aug 18 '15 at 15:12
  • Thank you, Tagir. I get the Stream while parsing file. I do not want to load the entire file into List. So my source is Stream – Andrii Karaivanskyi Aug 18 '15 at 15:27

1 Answers1

1

Edit: the original solution below does not work, since java stream does not allow calling skip or limit more than once on a same stream. I ended up simple processing like

    final AtomicInteger counter = new AtomicInteger();

    List<T> entityBatch = new ArrayList<>();

    entityStream.forEach(entity -> {
        if (counter.intValue() = batchSize) {
            processBatch(entityBatch);

            entityBatch.clear();
            counter.set(0);
        }

        entityBatch.add(entity);
        counter.incrementAndGet();
    });

    if (!entityBatch.isEmpty()) {
        processBatch(entityBatch);
    }

Original solution: It looks like I found the way to do that:

<T> Stream<List<T>> batchStream(Stream<T> stream, int batchSize) {
    return Stream.iterate(stream, s -> s.skip(batchSize)).map(s -> s.limit(batchSize).collect(toList()));
}
Andrii Karaivanskyi
  • 1,942
  • 3
  • 19
  • 23
  • 1
    If that works, then by pure accident. It contains the wrong assumption that `skip` and `limit` are guaranteed to modify the stream instead of returning a new instance, further, it attempts to use the stream more than one time when it has more elements than `batchSize`… – Holger Aug 18 '15 at 09:38