Ok, thanks for all the contributions. I combined what was said and implemented what I need. Maybe the implementation will clarify what I wanted to start with.
I created two classes, RepositryCollector and RepositorySpliterator.
public class RepositoryCollector<T> implements Collector<T, Tuple2<Integer,List<T>>, Integer>{
private JpaRepository<T, ?> repository;
private int threshold;
public BinaryOperator<Tuple2<Integer, List<T>>> combiner() {
return (listTuple, itemsTuple) -> {
List<T> list = listTuple._2;
List<T> items = itemsTuple._2;
list.addAll(items);
int sum = listTuple._1 + itemsTuple._1;
if(list.size() >= this.threshold){
this.repository.save(list);
this.repository.flush();
list = new LinkedList<>();
}
return new Tuple2<>(sum, list);
};
}
}
I omit the other function which are required for a Collector since all relevant information is present in the combiner. Same holds for the Spliterator.
public class RepositorySpliterator<T> implements Spliterator<T> {
private Slice<T> slice;
private Function<Pageable, Slice<T>> getSlice;
private Iterator<T> sliceIterator;
public RepositorySpliterator(Pageable pageable, Function<Pageable, Slice<T>> getSlice) {
this.getSlice = getSlice;
this.slice = this.getSlice.apply(pageable);
this.sliceIterator = slice.iterator();
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if(sliceIterator.hasNext()) {
action.accept(sliceIterator.next());
return true;
} else if (slice.hasNext()) {
this.slice = getSlice.apply(slice.nextPageable());
this.sliceIterator = this.slice.iterator();
if(sliceIterator.hasNext()){
action.accept(sliceIterator.next());
return true;
}
}
return false;
}
public Stream<T> getStream(boolean parallel){
return StreamSupport.stream(this, parallel);
}
}
As you can see I put in a helper function to generate the Stream I need. Maybe this is a tad sloppy but... meh.
So now I just need a few lines of code in my mapping classes to get things going.
public void start(Timestamp startTimestamp, Timestamp endTimestamp) {
new RepositorySpliterator<>(
new PageRequest(0, 10000), pageable -> sourceRepository.findAllBetween(startTimestamp, endTimestamp, pageable))
.getStream(true)
.map(entity -> mapToTarget(endTimestamp, entity))
.collect(new RepositoryCollector<>(targetRepository, 1000));
}
The mapper will fetch 10000 Entities from the source, will pour them into the stream pool so they can be mapped and stored. Whenever one of the streams runs out of new Entities, a fresh batch will be fetched and fed into the same stream pool.
If there are glaring errors in my implementation feel free to comment and improve!