0

I have finite stream of n entries. It's count is unknown in advance. Data size is around 10Gb and it's to big to fit in RAM, so I cannot read it as a whole. What would be a way to process that stream in chunks after each 100000 entries?

Stream<?> blocks

so methods as subList are not available for me.

I could imagine it something like this in code:

    IntStream
            .range(0, Integer.MAX_VALUE)
            .filter(s -> s % 100000 == 0)
            .mapToObj(s -> blocks
                    .skip(s)
                    .collect(Collectors.toList())
                    .forEach(MyClass::doSomething)
            );

But then I get error, because limit is terminal operator and it closes stream. Is there some workaround?

lapkritinis
  • 2,664
  • 4
  • 19
  • 29
  • Possible duplicate of [Java 8 Stream IllegalStateException: Stream has already been operated on or closed](https://stackoverflow.com/questions/27990451/java-8-stream-illegalstateexception-stream-has-already-been-operated-on-or-clos) – lackerman Sep 06 '17 at 11:11
  • 1
    I understand why I get that error, but it doesn't help me to come up with workaround. – lapkritinis Sep 06 '17 at 11:14
  • 1
    “*because limit is terminal operator*” `limit` is not a terminal operation and there is no `limit` operation in your code at all. – Holger Sep 06 '17 at 12:07
  • 1
    Besides, what is `MyClass::doSomething`? Chaining `.forEach(MyClass::doSomething)` after `.collect(Collectors.toList())` would suggest that this method is invoked for single elements anyway, so where’s the need for additional batching here? `blocks.forEach(MyClass::doSomething)` would be sufficient to process one element at a time without loading everything into memory at once. – Holger Sep 06 '17 at 12:25

2 Answers2

1

If

IntStream
        .range(0, Integer.MAX_VALUE)
        .filter(s -> s % 100000 == 0)
        .mapToObj(s -> blocks
                .skip(s)
                .collect(Collectors.toList())
                .forEach(MyClass::doSomething)
        );

compiles without errors, the method doSomething must be a method receiving a single element of blocks at a time, as that’s what List.forEach(…) does, call the consumer for each element individually. (Ignoring the fact that List.forEach is void and therefore can’t be sufficient for mapToObj(…) in the outer stream). In this case, there is no benefit in “batching” at all and you can just use

blocks.forEachOrdered(MyClass::doSomething);

as this will load one element at a time and allow each element to get garbage collected after processing when processing the next element (unless doSomething stores a reference somewhere).

Your attempt to collect 100000 elements into a List before calling doSomething does not improve the performance, as the stream will still load each element one after the other and you are still processing one element after another. It only prevents the garbage collection of up to 99999 elements until the 100000th has been processed. That’s no advantage.

Holger
  • 285,553
  • 42
  • 434
  • 765
0

It would appear that you would have to use the Spliterator have a look at the Java docs as suggested in the answers to previous questions

A simplified example below will produce output in 10 block chunks whilst keeping the stream open.

Stream<Block<Integer>> blocks = IntStream
  .range(0, 1000)
  .mapToObj(Block::new);

Spliterator<Block<Integer>> split = blocks.spliterator();
int chunkSize = 10;

while (true) {
  List<Block<Integer>> chunk = new ArrayList<>(10);
  for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++) {
    chunk.get(i).doSomething();
  }
  System.out.println();
  if (chunk.isEmpty()) break;
}

class Block<T> {
  private T value;

  Block(T value) {
    this.value = value;
  }

  void doSomething() {
    System.out.print("v: " + value);
  }
}
lackerman
  • 56
  • 6