17

I am very new to java and I want to parallelize a nested for loop using executor service or using any other method in java. I want to create some fixed number of threads so that CPU is not completely acquired by threads.

    for(SellerNames sellerNames : sellerDataList) {
        for(String selleName : sellerNames) {
        //getSellerAddress(sellerName)
        //parallize this task
        }
    }

size of sellerDataList = 1000 and size of sellerNames = 5000.

Now I want to create 10 threads and assign equal chunk of task to each thread equally. That is for i'th sellerDataList, first thread should get address for 500 names, second thread should get address for next 500 names and so on.
What is the best way to do this job?

Jhutan Debnath
  • 505
  • 3
  • 13
  • 24
  • You can use a FixedSizeThreadPool and submit one Task for each sellerNames. See https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool-int- When you submitted all tasks, issue a [shutdown](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#shutdown--) on the Executor , then wait for it to finish with [awaitTermination](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination-long-java.util.concurrent.TimeUnit-) – Fildor Apr 27 '17 at 10:59
  • maybe `Collection`s `.stream().parallel()` form is an option? – Timothy Truckle Apr 27 '17 at 11:06
  • I have tried Collections .stream().parallel(). It is limited by number of cores. – Jhutan Debnath Apr 27 '17 at 11:10
  • The syntax is wrong. Where you wrote `foreach` it should be `for`. – Lew Bloch Apr 27 '17 at 11:35
  • 1
    @Jhutan Debnath, "limited by [the] number of cores" isn't actually much of a limitation. "parallize" should be spelled "parallelize". – Lew Bloch Apr 27 '17 at 11:38
  • take a look here http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream – Yamahar1sp Apr 27 '17 at 12:00
  • Before going too far down this route, you may want to find out more about `getselleraddress`. If that's a database query, making a lot of such calls from a pool of threads isn't necessarily going to be faster. Same if that involves net traffic - the NIC becomes the bottleneck. Basically, is there genuinely an opportunity for a performance increase by parallelising this code? – bazza Apr 27 '17 at 12:09
  • @bazza getselleraddress() is a API call, and each call takes 100ms to complete, but this API can process 200 request per second. So I need multiple threads for the best performance. – Jhutan Debnath Apr 27 '17 at 12:19

2 Answers2

26

There are two ways to make it run parallelly: Streams and Executors.

Using streams

You can use parallel streams and leave the rest to the jvm. In this case you don't have too much control over what happens when. On the other hand your code will be easy to read and maintain:

    sellerDataList.stream().forEach(sellerNames -> {
        Stream<String> stream = StreamSupport.stream(sellerNames.spliterator(), true); // true means use parallel stream
        stream.forEach(sellerName -> {
            getSellerAddress(sellerName);
        });
    });

Using an ExecutorService

Suppose, you want 5 Threads and you want to be able to wait until task completion. Then you can use a fixed thread pool with 5 threads and use Future-s so you can wait until they are done.

    final ExecutorService executor = Executors.newFixedThreadPool(5); // it's just an arbitrary number
    final List<Future<?>> futures = new ArrayList<>();
    for (SellerNames sellerNames : sellerDataList) {
        for (final String sellerName : sellerNames) {
            Future<?> future = executor.submit(() -> {
                getSellerAddress(sellerName);
            });
            futures.add(future);
        }
    }
    try {
        for (Future<?> future : futures) {
            future.get(); // do anything you need, e.g. isDone(), ...
        }
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
Tamas Rev
  • 7,008
  • 5
  • 32
  • 49
0

If you are using a parallel stream you can still control the thread by creating your own ForkJoinPool.

List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
  .collect(Collectors.toList());

ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
  () -> aList.parallelStream().reduce(0L, Long::sum)).get();

Here on this site, it is described very well. https://www.baeldung.com/java-8-parallel-streams-custom-threadpool

Jay
  • 3
  • 3