0

I have this piece of code that is getting 200k entries from the database and afterwards is processing it in batches. What I noticed is that the processBatch (5000 entries per batch) it takes around 3 minutes to process (2 hours processing basically)

I am already using indexes for the database for the entryId to speedup finding entries. What I am wondering is if is possible to have this batch processing somehow maybe running in parallel.

If is not clear I would gladly refactor the question


         Stream<X> streams= repository
                .streamBySourceFileCreationDate(getDateFromFilename(sourceFileName));

        int batchSize = properties.getBuildOutput().getSize();
        List<OutX> built = new ArrayList<>();

        Iterators.partition(streams.iterator(), batchSize)
                .forEachRemaining(list -> processBatch(list, built));

   private void processBatch(List<X> list, List<OutX> built) {
        list.stream().map(m -> {
                X xEntry = repository
                        .findLatestUpdateById(m.getEntryId());
                if (xEntry == null) {
                    return null;
                }
                XOut out = buildXOut(x);
                return out;
        }).filter(Objects::nonNull).forEach(built::add);
}

UPDATE 1: I tried like this to update the first part and this reduced the processing to around 25 min. Any other suggestion maybe how to make this faster ?

List<x> lists = repository
              .findBySourceFileCreationDate(getDateFromFilename(sourceFileName));

int batchSize = properties.getBuildOutput().getSize();
        List<OutX> built = new ArrayList<>();// Should I use maybe a vector for thread safe ? 


        ForkJoinPool forkJoinPool = new ForkJoinPool();
        List<ForkJoinTask<?>> tasks = new ArrayList<>();

        for (int i = 0; i < lists.size(); i += batchSize) {
            int endIndex = Math.min(i + batchSize, lists.size());
            List<X> batch = lists.subList(i, endIndex);
            ForkJoinTask<?> task = forkJoinPool.submit(() -> processBatch(batch, built));
            tasks.add(task);
        }

        tasks.forEach(ForkJoinTask::join);
        forkJoinPool.shutdown();
cUser
  • 392
  • 8
  • 25
  • 1
    You are taking a full stream and iterating over it to turn it into a bunch of lists, and then serially, iterating over each list to call findLatestUpdateById and buildXOut. There are any number of ways to parallelize this, possibly by moving the processBatch to a thread to process instead of having the forEachRemaining wait for it to finish, or getting a list instead. Note though your List built is not thread safe, so you can't have concurrent processes adding to it - https://stackoverflow.com/questions/60095513/why-is-list-parallelstream-foreach-not-processing-all-the-elements-in-the-li – Chris Jun 22 '23 at 17:15
  • Thank you for the suggestion maybe you can have a look at the updated question and you will have some better approach :) – cUser Jun 23 '23 at 16:24

1 Answers1

1

One of many possible options:

List<X> readIn = repository.findBySourceFileCreationDate(getDateFromFilename(sourceFileName));
List<OutX> built = readIn.parallelStream()
    .map(m -> 
        {
          X xEntry = repository.findLatestUpdateById(m.getEntryId());
          if (xEntry == null) {
            return null;
          }
          return buildXOut(x);
        })
    .filter(Objects::nonNull)
    .collect(Collectors.toList());
Chris
  • 20,138
  • 2
  • 29
  • 43
  • Interestingly enough with this piece of code is still as slow as before :( – cUser Jun 23 '23 at 16:16
  • I updated the question with some version now that made the code faster, maybe you have a better idea :) – cUser Jun 23 '23 at 16:24
  • You need to profile what is taking the time. It could be the initial query, in which case you might try pagination - determine the number of records, and then concurrently read in pages. – Chris Jun 23 '23 at 17:51