85

My exact scenario is inserting data to database in batches, so I want to accumulate DOM objects then every 1000, flush them.

I implemented it by putting code in the accumulator to detect fullness then flush, but that seems wrong - the flush control should come from the caller.

I could convert the stream to a List then use subList in an iterative fashion, but that too seems clunky.

It there a neat way to take action every n elements then continue with the stream while only processing the stream once?

Misha
  • 27,433
  • 6
  • 62
  • 78
Bohemian
  • 412,405
  • 93
  • 575
  • 722
  • 2
    For a similar use case I did this: https://bitbucket.org/assylias/bigblue-utils/src/3f56d19777a0ebc5dc3b53d3c2ec8dc64fd2b28e/src/main/java/com/assylias/bigblue/utils/SplitProcessing.java?at=master - not exactly what you are asking for though. – assylias Dec 20 '14 at 23:47

11 Answers11

34

Elegance is in the eye of the beholder. If you don't mind using a stateful function in groupingBy, you can do this:

AtomicInteger counter = new AtomicInteger();

stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize))
    .values()
    .forEach(database::flushChunk);

This doesn't win any performance or memory usage points over your original solution because it will still materialize the entire stream before doing anything.

If you want to avoid materializing the list, stream API will not help you. You will have to get the stream's iterator or spliterator and do something like this:

Spliterator<Integer> split = stream.spliterator();
int chunkSize = 1000;

while(true) {
    List<Integer> chunk = new ArrayList<>(size);
    for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){};
    if (chunk.isEmpty()) break;
    database.flushChunk(chunk);
}
Jean-François Savard
  • 20,626
  • 7
  • 49
  • 76
Misha
  • 27,433
  • 6
  • 62
  • 78
31

Most of answers above do not use stream benefits like saving your memory. You can try to use iterator to resolve the problem

Stream<List<T>> chunk(Stream<T> stream, int size) {
  Iterator<T> iterator = stream.iterator();
  Iterator<List<T>> listIterator = new Iterator<>() {

    public boolean hasNext() {
      return iterator.hasNext();
    }

    public List<T> next() {
      List<T> result = new ArrayList<>(size);
      for (int i = 0; i < size && iterator.hasNext(); i++) {
        result.add(iterator.next());
      }
      return result;
    }
  };
  return StreamSupport.stream(((Iterable<List<T>>) () -> listIterator).spliterator(), false);
}
dmitryvim
  • 1,983
  • 4
  • 15
  • 27
  • 4
    Very nice solution, +1. Just one improvement: you might want to return the stream as `return StreamSupport.stream(Spliterators.spliteratorUnknownSize(listIterator, Spliterator.ORDERED), false);`. – Peter Walser May 23 '21 at 23:02
  • @PeterWalser would you mind to elaborate what does your suggestion do? Something to do with maintaining the chunked parts in order? – Jokkeri Oct 27 '21 at 07:17
22

If you have guava dependency on your project you could do this:

StreamSupport.stream(Iterables.partition(simpleList, 1000).spliterator(), false).forEach(...);

See https://google.github.io/guava/releases/23.0/api/docs/com/google/common/collect/Lists.html#partition-java.util.List-int-

Update to use Iterators on Stream.iterator(). This will terminate but NOT consume the Stream in creating the grouping Iterator. This could be converted back to a Stream if needed.

Iterator<List<T>> listIterator = Iterators.partition(stream.iterator(), desiredSize);
Stream<List<T>> listStream = StreamSupport.stream(
  Spliterators.spliteratorUnknownSize(listIterator, 
  Spliterator.ORDERED), false);

https://guava.dev/releases/17.0/api/docs/com/google/common/collect/Iterators.html#partition(java.util.Iterator,%20int)

John B
  • 32,493
  • 6
  • 77
  • 98
user2814648
  • 421
  • 4
  • 12
  • 5
    This solution splits a list and not a stream. Useful but not what @Bohemian asked. – AlikElzin-kilaka Nov 28 '18 at 16:31
  • @AlikElzin-kilaka but you can make a stream with iterable (https://www.baeldung.com/java-iterable-to-stream). – mgcation Apr 22 '21 at 06:00
  • Seems that this would be more appropriate to use https://guava.dev/releases/17.0/api/docs/com/google/common/collect/Iterators.html#partition(java.util.Iterator,%20int) `Iterators.partition(stream.iterator(), size)`. This is non-consuming till the resulting `Iteartor` is consumed and could be converted back to a `Stream` if needed. – John B Sep 17 '22 at 10:55
18

You can create a stream of chunks (List<T>) of a stream of items and a given chunk size by

  • grouping the items by the chunk index (element index / chunk size)
  • ordering the chunks by their index
  • reducing the map to their ordered elements only

Code:

public static <T> Stream<List<T>> chunked(Stream<T> stream, int chunkSize) {
    AtomicInteger index = new AtomicInteger(0);

    return stream.collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize))
            .entrySet().stream()
            .sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue);
}

Example usage:

Stream<Integer> stream = IntStream.range(0, 100).mapToObj(Integer::valueOf);
Stream<List<Integer>> chunked = chunked(stream, 8);
chunked.forEach(chunk -> System.out.println("Chunk: " + chunk));

Output:

Chunk: [0, 1, 2, 3, 4, 5, 6, 7]
Chunk: [8, 9, 10, 11, 12, 13, 14, 15]
Chunk: [16, 17, 18, 19, 20, 21, 22, 23]
Chunk: [24, 25, 26, 27, 28, 29, 30, 31]
Chunk: [32, 33, 34, 35, 36, 37, 38, 39]
Chunk: [40, 41, 42, 43, 44, 45, 46, 47]
Chunk: [48, 49, 50, 51, 52, 53, 54, 55]
Chunk: [56, 57, 58, 59, 60, 61, 62, 63]
Chunk: [64, 65, 66, 67, 68, 69, 70, 71]
Chunk: [72, 73, 74, 75, 76, 77, 78, 79]
Chunk: [80, 81, 82, 83, 84, 85, 86, 87]
Chunk: [88, 89, 90, 91, 92, 93, 94, 95]
Chunk: [96, 97, 98, 99]
Peter Walser
  • 15,208
  • 4
  • 51
  • 78
  • 1
    Thanks, I used your solution. I removed the sort not needed in my case. – user1708042 Nov 19 '20 at 09:33
  • 1
    Very nice solution – Kingshuk Mukherjee Feb 17 '21 at 21:33
  • 11
    This solution will read the complete stream into a map before processing the chunks, rather than producing chunks "mid-stream". This might not be what you'd want/expect, especially for large streams which are probably the biggest use case for chunked processing. – Markus Rohlof May 23 '21 at 13:30
  • 1
    @MarkusRohlof yes, you're absolutely right. I just tried to come up with a solution for larger (and potentially infinite) streams, only to find out it looks precisely the same as the one suggested by *dmitryvim*, so I really can recommend his solution. – Peter Walser May 23 '21 at 22:53
9

Using library StreamEx solution would look like

Stream<Integer> stream = IntStream.iterate(0, i -> i + 1).boxed().limit(15);
AtomicInteger counter = new AtomicInteger(0);
int chunkSize = 4;

StreamEx.of(stream)
        .groupRuns((prev, next) -> counter.incrementAndGet() % chunkSize != 0)
        .forEach(chunk -> System.out.println(chunk));

Output:

[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9, 10, 11]
[12, 13, 14]

groupRuns accepts predicate that decides whether 2 elements should be in the same group.

It produces a group as soon as it finds first element that does not belong to it.

Nazarii Bardiuk
  • 4,272
  • 1
  • 19
  • 22
  • This doesn't work for a single record. For example, an integer stream of simply [1] would fail. – wild_nothing Aug 04 '17 at 14:17
  • Stream of single item does work for me. What kind of error do you see? Could you post code that you tried? – Nazarii Bardiuk Aug 04 '17 at 14:42
  • The counter returns an incorrect value in the case there is one record. – wild_nothing Aug 05 '17 at 23:29
  • I believe in the case of one record the groupRuns() is never called as it expects two entries. Is there a solution if the stream only returns one result? The incrementAndGet on the counter in your example is never hit and returns 0 if the chunk size is 1. – wild_nothing Aug 11 '17 at 23:25
  • Grouping with chunk size 1 produces stream of lists of size 1. I believe it is expected behavior. Can you explain what do you try to achieve and what problem do you have? Probably with new stack overflow question - it is hard to share code in comments – Nazarii Bardiuk Aug 11 '17 at 23:31
  • Raised: https://stackoverflow.com/questions/45649990/streamex-grouping-into-lists-returns-an-incorrect-number-of-records – wild_nothing Aug 12 '17 at 11:58
  • This is nice, but only if the original stream is sequential, which I guess is the need of the original OP. Unfortunately, in my case I have a parallel stream source and nothing works, % chunkSize != 0 creates chunkSize chunks of average size total/chunkSize. – zakmck Nov 22 '19 at 23:52
6

Here is simple wrapping spliterator implementation that groups source elements into chunks:

public class ChunkedSpliterator<T> implements Spliterator<List<T>> {
    private static final int PROMOTED_CHARACTERISTICS = Spliterator.ORDERED | Spliterator.DISTINCT | Spliterator.SIZED | Spliterator.IMMUTABLE | Spliterator.CONCURRENT;
    private static final int SELF_CHARACTERISTICS = Spliterator.NONNULL;

    private final Spliterator<T> src;
    private final int chunkSize;

    public ChunkedSpliterator(Spliterator<T> src, int chunkSize) {
        if (chunkSize < 1)
            throw new IllegalArgumentException("chunkSize must be at least 1");
        this.src = src;
        this.chunkSize = chunkSize;
    }

    public static <E> Stream<List<E>> chunkify(Stream<E> src, int chunkSize) {
        ChunkedSpliterator<E> wrap = new ChunkedSpliterator<>(src.spliterator(), chunkSize);
        return StreamSupport.stream(wrap, src.isParallel());
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<T>> action) {
        List<T> result = new ArrayList<>((int) Math.min(src.estimateSize(), chunkSize));
        for (int i = 0; i < chunkSize; ++i) {
            if (!src.tryAdvance(result::add))
                break;
        }
        if (result.isEmpty())
            return false;
        action.accept(result);
        return true;
    }

    @Override
    public Spliterator<List<T>> trySplit() {
        Spliterator<T> srcSplit = src.trySplit();
        return srcSplit == null ? null : new ChunkedSpliterator<>(srcSplit, chunkSize);
    }

    @Override
    public long estimateSize() {
        long srcSize = src.estimateSize();
        if (srcSize <= 0L) return 0L;
        if (srcSize == Long.MAX_VALUE) return Long.MAX_VALUE;
        return (srcSize - 1) / chunkSize + 1;
    }

    @Override
    public int characteristics() {
        return (src.characteristics() & PROMOTED_CHARACTERISTICS) | SELF_CHARACTERISTICS;
    }
}

There is handy chunkify shortcut method to make things easier:

    Stream<T> input = ...;
    Stream<List<T>> chunked = ChunkedSpliterator.chunkify(input, 1000);

Despite the call Stream.spliterator() is terminal operation it actually does not forcibly exhaust the stream's source. So, it can be processed via its spliterator gradually, without fetching all the data in memory - only per chunk.

This spliterator preserves most of input's characteristics. However, it's not sub-sized (chunks may be split in middle), not sorted (not obvious how to sort chunks even if elements are sortable) and produce only non-null chunks (albeit chunks still may have null elements). I'm not 100% sure about concurrent/immutable, but it seems it should inherit these with no problem. Also, produced chunks may be not strictly of requested size, but never exceed it.

In fact, I'm very surprised such a popular question had no answer introducing custom spliterator for almost 7 (!) years.

Vasily Liaskovsky
  • 2,248
  • 1
  • 17
  • 32
  • Is there a reason tou went with estimateSize() -> (srcSize-1)/batchSize+1 , instead of maintaining a sequenceCounter and have (srcSize/batchSize)-sequenceCounter ? – Dharmvir Tiwari Oct 11 '21 at 17:43
  • 1
    Using any internal sequence counter might be bad idea, because accuracy of this spliterator's `estimateSize` depends on accuracy of the delegate and that *might* vary while it is being consumed. The delegate's implementation might return less accurate results at its start and more accurate closer to end. As `estimateSize` should reflect most accurate *current* state it's better to rely on delegate's size every time. – Vasily Liaskovsky Oct 11 '21 at 20:11
  • But (srcSize-1)/batchSize+1 would always give you inaccurate value. batchSize of 5, srcSize of 100. For first invocation of forEachRemaining() it should give you 20 but with the above calculation it would give you 16. With internal seq counter , i only see an issue if we parallelise the stream but that can be mitigated with shared seq counter. – Dharmvir Tiwari Oct 11 '21 at 21:09
  • 1
    (100 - 1) / 5 + 1 = 99 / 5 + 1 = 19 + 1 = 20. What's wrong? – Vasily Liaskovsky Oct 11 '21 at 21:12
  • This should be the accepted answer. – sakra Feb 25 '22 at 19:07
4

Look's like no, cause creating chunks means reducing stream, and reduce means termination. If you need to maintain stream nature and process chunks without collecting all data before here is my code (does not work for parallel streams):

private static <T> BinaryOperator<List<T>> processChunks(Consumer<List<T>> consumer, int chunkSize) {
    return (data, element) -> {
        if (data.size() < chunkSize) {
            data.addAll(element);
            return data;
        } else {
            consumer.accept(data);
            return element; // in fact it's new data list
        }
    };
}

private static <T> Function<T, List<T>> createList(int chunkSize) {
    AtomicInteger limiter = new AtomicInteger(0);
    return element -> {
        limiter.incrementAndGet();
        if (limiter.get() == 1) {
            ArrayList<T> list = new ArrayList<>(chunkSize);
            list.add(element);
            return list;
        } else if (limiter.get() == chunkSize) {
            limiter.set(0);
        }
        return Collections.singletonList(element);
    };
}

and how to use

Consumer<List<Integer>> chunkProcessor = (list) -> list.forEach(System.out::println);

    int chunkSize = 3;

    Stream.generate(StrTokenizer::getInt).limit(13)
            .map(createList(chunkSize))
            .reduce(processChunks(chunkProcessor, chunkSize))
            .ifPresent(chunkProcessor);

static Integer i = 0;

static Integer getInt()
{
    System.out.println("next");
    return i++;
}

it will print

next next next next 0 1 2 next next next 3 4 5 next next next 6 7 8 next next next 9 10 11 12

the idea behind is to create lists in a map operation with 'pattern'

[1,,],[2],[3],[4,,]...

and merge (+process) that with reduce.

[1,2,3],[4,5,6],...

and don't forget to process the last 'trimmed' chunk with

.ifPresent(chunkProcessor);
Yura
  • 1,733
  • 1
  • 20
  • 19
3

As Misha rightfully said, Elegance is in the eye of the beholder. I personally think an elegant solution would be to let the class that inserts to the database do this task. Similar to a BufferedWriter. This way it does not depend on your original data structure and can be used even with multiple streams after one and another. I am not sure if this is exactly what you mean by having the code in the accumulator which you thought is wrong. I don't think it is wrong, since the existing classes like BufferedWriter work this way. You have some flush control from the caller this way by calling flush() on the writer at any point.

Something like the following code.

class BufferedDatabaseWriter implements Flushable {
    List<DomObject> buffer = new LinkedList<DomObject>();
    public void write(DomObject o) {
        buffer.add(o);
        if(buffer.length > 1000)
            flush();
    }
    public void flush() {
        //write buffer to database and clear it
    }
}

Now your stream gets processed like this:

BufferedDatabaseWriter writer = new BufferedDatabaseWriter();
stream.forEach(o -> writer.write(o));
//if you have more streams stream2.forEach(o -> writer.write(o));
writer.flush();

If you want to work multithreaded, you could run the flush asynchronous. The taking from the stream can't go in parallel but I don't think there is a way to count 1000 elements from a stream in parallel anyway.

You can also extend the writer to allow setting of the buffer size in constructor or you can make it implement AutoCloseable and run it in a try with ressources and more. The nice things you have from a BufferedWriter.

findusl
  • 2,454
  • 8
  • 32
  • 51
  • You can also make it AutoCloseable and then do try (BufferedDatabaseWriter bdw = new BufferedDatabaseWriter()) { stream.forEach(o -> writer.write(o)); } – Iouri Goussev Mar 18 '21 at 22:35
0

You can use this class, https://github.com/1wpro2/jdk-patch/blob/main/FixedSizeSpliterator.java.

Pass in the chunk size as the THRESHOLD

new FixedSizeSpliterator(T[] values, int threshold)

Jackie
  • 25,199
  • 6
  • 33
  • 24
0

In case you need very simple solution:

import java.util.List;

class Scratch {

    public static void main(String[] args) {
        List<Integer> list = List.of(1, 2, 3, 4, 5, 6, 7, 8);
        int chunkSize = 3;
        for (int i = 0; i < list.size() / chunkSize + Math.min(1, list.size() % chunkSize); i++) {
            List<Integer> subList = list.subList(i * chunkSize, Math.min(i * chunkSize + chunkSize, list.size()));
            System.out.println("subList = " + subList);
        }
    }
}

Output:

subList = [1, 2, 3]
subList = [4, 5, 6]
subList = [7, 8]
Borislav Markov
  • 1,495
  • 11
  • 12
  • I just wonder how Oracle team don't do some utilities, while other languages have tons of utilities integrated in syntax or core language libraries. Java suffers from good utils. – Borislav Markov Sep 28 '22 at 12:21
0

I wanted a solution which wasn't relying on a mutable state, and came up with this:

var idents = IntStream.range(0, 1000).boxed().toList();
int max = 10;
var result =
    idents.stream()
        .collect(() -> new ArrayList<Set<Integer>>(),
            (ret, id) -> {
                if (ret.isEmpty() || ret.get(ret.size() - 1).size() == max) {
                    ret.add(new HashSet<>());
                }
                ret.get(ret.size() - 1).add(id);
            },
            ArrayList::addAll);

It even works with a parallel stream but, in this case, there might be more than one chunk with less than maximum entries.

Adrian Mole
  • 49,934
  • 160
  • 51
  • 83