I have this piece of code that is getting 200k entries from the database and afterwards is processing it in batches. What I noticed is that the processBatch (5000 entries per batch) it takes around 3 minutes to process (2 hours processing basically)
I am already using indexes for the database for the entryId to speedup finding entries. What I am wondering is if is possible to have this batch processing somehow maybe running in parallel.
If is not clear I would gladly refactor the question
Stream<X> streams= repository
.streamBySourceFileCreationDate(getDateFromFilename(sourceFileName));
int batchSize = properties.getBuildOutput().getSize();
List<OutX> built = new ArrayList<>();
Iterators.partition(streams.iterator(), batchSize)
.forEachRemaining(list -> processBatch(list, built));
private void processBatch(List<X> list, List<OutX> built) {
list.stream().map(m -> {
X xEntry = repository
.findLatestUpdateById(m.getEntryId());
if (xEntry == null) {
return null;
}
XOut out = buildXOut(x);
return out;
}).filter(Objects::nonNull).forEach(built::add);
}
UPDATE 1: I tried like this to update the first part and this reduced the processing to around 25 min. Any other suggestion maybe how to make this faster ?
List<x> lists = repository
.findBySourceFileCreationDate(getDateFromFilename(sourceFileName));
int batchSize = properties.getBuildOutput().getSize();
List<OutX> built = new ArrayList<>();// Should I use maybe a vector for thread safe ?
ForkJoinPool forkJoinPool = new ForkJoinPool();
List<ForkJoinTask<?>> tasks = new ArrayList<>();
for (int i = 0; i < lists.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, lists.size());
List<X> batch = lists.subList(i, endIndex);
ForkJoinTask<?> task = forkJoinPool.submit(() -> processBatch(batch, built));
tasks.add(task);
}
tasks.forEach(ForkJoinTask::join);
forkJoinPool.shutdown();