0

I have a for loop that is looping over a list of collections. Inside the loop some select/update queries are taking place on collection which are exclusive of the other collections. Since each collection has a lot of data to process on i would like to parallelize it.

The code snippet looks something like this:

//Some variables that are used within the for loop logic
 for(String collection : collections) {
    //Select queries on collection
    //Update queries on collection
}

How can i achieve this in java?

Vishal Roy
  • 120
  • 1
  • 10

3 Answers3

3

You can use the parallelStream() method (since java 8):

collections.parallelStream().forEach((collection) -> {
    //Select queries on collection
    //Update queries on collection
});

More informations about streams.


Another way to do it is using Executors :

    try
    {
        final ExecutorService exec = Executors.newFixedThreadPool(collections.size());
        for (final String collection : collections)
        {
            exec.submit(() -> {
                // Select queries on collection
                // Update queries on collection
            });
        }

        // We want to wait that the jobs are done.
        final boolean terminated = exec.awaitTermination(500, TimeUnit.MILLISECONDS);
        if (terminated == false)
        {
            exec.shutdownNow();
        }

    } catch (final InterruptedException e)
    {
        e.printStackTrace();
    }

This example is more powerfull since you can easily know when the job is done, force termination... and more.

Ealrann
  • 368
  • 1
  • 15
  • 2
    @Visgal right, however consider [this answer](https://stackoverflow.com/a/20375622/3666539) as parallelStream may not _always_ be the best solution – noiaverbale Mar 28 '19 at 11:15
  • `for (final Collection> collection : collections)` this step is giving me an error: **Type mismatch: cannot convert from element type String to Collection>** – Vishal Roy Mar 28 '19 at 13:17
  • Ok, I thought you were using some List of Collection>, my bad. Just replace by String. I edited the post. – Ealrann Mar 28 '19 at 13:28
2
final int numberOfThreads = 32;

final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);

// List to store the 'handles' (Futures) for all tasks:
final List<Future<MyResult>> futures = new ArrayList<>();

// Schedule one (parallel) task per String from "collections":
for(final String str : collections) {
  futures.add(executor.submit(() -> { return doSomethingWith(str); }));
}


// Wait until all tasks have completed:
for ( Future<MyResult> f : futures ) {
  MyResult aResult = f.get(); // Will block until the result of the task is available.
  // Optionally do something with the result...
}

executor.shutdown(); // Release the threads held by the executor.

// At this point all tasks have ended and we can continue as if they were all executed sequentially

Adjust the numberOfThreads as needed to achieve the best throughput. More threads will tend to utilize the local CPU better, but may cause more overhead at the remote end. To get good local CPU utilization, you want to have (much) more threads than CPUs (/cores) so that, whenever one thread has to wait, e.g. for a response from the DB, another thread can be switched in to execute on the CPU.

JimmyB
  • 12,101
  • 2
  • 28
  • 44
0

There are a number of question that you need to ask yourself to find the right answer:

If I have as many threads as the number of my CPU cores, would that be enough?

Using parallelStream() will give you as many threads as your CPU cores.

Will parallelizing the loop give me a performance boost or is there a bottleneck on the DB?

You could spin up 100 threads, processing in parallel, but this doesn't mean that you will do things 100 times faster, if your DB or the network cannot handle the volume. DB locking can also be an issue here.

Do I need to process my data in a specific order?

If you have to process your data in a specific order, this may limit your choices. E.g. forEach() doesn't guarantee that the elements of your collection will be processed in a specific order, but forEachOrdered() does (with a performance cost).

Is my datasource capable of fetching data reactively?

There are cases when our datasource can provide data in the form of a stream. In that case, you can always process this stream using a technology such as RxJava or WebFlux. This would enable you to take a different approach on your problem.

Having said all the above, you can choose the approach you want (executors, RxJava etc.) that fit better to your purpose.

Sofo Gial
  • 697
  • 1
  • 9
  • 20