64

Say I have a task like:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

What is the easiest way to parallelize each compute() (assuming they are already parallelizable)?

I do not need an answer that matches strictly the code above, just a general answer. But if you need more info: my tasks are IO bound and this is for a Spring Web application and the tasks are going to be executed in a HTTP request.

blacktide
  • 10,654
  • 8
  • 33
  • 53
Eduardo
  • 2,327
  • 5
  • 26
  • 43

10 Answers10

87

I would recommend taking a look at ExecutorService.

In particular, something like this:

ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
    Callable<Result> c = new Callable<Result>() {
        @Override
        public Result call() throws Exception {
            return compute(object);
        }
    };
    tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);

Note that using newCachedThreadPool could be bad if objects is a big list. A cached thread pool could create a thread per task! You may want to use newFixedThreadPool(n) where n is something reasonable (like the number of cores you have, assuming compute() is CPU bound).

Here's full code that actually runs:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceExample {
    private static final Random PRNG = new Random();

    private static class Result {
        private final int wait;
        public Result(int code) {
            this.wait = code;
        }
    }

    public static Result compute(Object obj) throws InterruptedException {
        int wait = PRNG.nextInt(3000);
        Thread.sleep(wait);
        return new Result(wait);
    }

    public static void main(String[] args) throws InterruptedException,
        ExecutionException {
        List<Object> objects = new ArrayList<Object>();
        for (int i = 0; i < 100; i++) {
            objects.add(new Object());
        }

        List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
        for (final Object object : objects) {
            Callable<Result> c = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return compute(object);
                }
            };
            tasks.add(c);
        }

        ExecutorService exec = Executors.newCachedThreadPool();
        // some other exectuors you could try to see the different behaviours
        // ExecutorService exec = Executors.newFixedThreadPool(3);
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            long start = System.currentTimeMillis();
            List<Future<Result>> results = exec.invokeAll(tasks);
            int sum = 0;
            for (Future<Result> fr : results) {
                sum += fr.get().wait;
                System.out.println(String.format("Task waited %d ms",
                    fr.get().wait));
            }
            long elapsed = System.currentTimeMillis() - start;
            System.out.println(String.format("Elapsed time: %d ms", elapsed));
            System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
        } finally {
            exec.shutdown();
        }
    }
}
overthink
  • 23,985
  • 4
  • 69
  • 69
  • Is there a c# version of this? – Malfist Jan 06 '10 at 20:48
  • 2
    Also look at Executors, which functions as a factory for various flavors of executor services. – Rob H Jan 06 '10 at 20:49
  • @Malfist in C# there's tasks (well for the upcoming .net 4) that make all of these a breeze :). And there are delegates/lambdas and threads, funcs, threadstart, etc to do it in 3.5 – Francisco Noriega Jan 16 '10 at 18:44
  • @Malfist, I know this is an old comment, but C# has [Parallel.ForEach](https://msdn.microsoft.com/en-us/library/dd460720%28v=vs.110%29.aspx) and the [Task Parallels Library - aka TPL](https://msdn.microsoft.com/en-us/library/dd460717%28v=vs.110%29.aspx) now. They're pretty complete. – Machado Oct 25 '16 at 19:42
9

With Java8 and later you can use a parallelStream on the collection to achieve this:

List<T> objects = ...;

List<Result> result = objects.parallelStream().map(object -> {
            return compute(object);
        }).collect(Collectors.toList());

Note: the order of the result list may not match the order in the objects list.

Details how to setup the right number of threads are available in this stackoverflow question how-many-threads-are-spawned-in-parallelstream-in-java-8

i000174
  • 1,107
  • 10
  • 15
  • 1
    This in my view is code smell. You are blocking all other code using parallelStream. In test or small app, might me ok, but on big server this *might* be recipe for disaster. – user482745 Oct 17 '18 at 11:44
  • 4
    Streams are designed for data parallelism, not task parallelism. See https://stackoverflow.com/a/23370799/208288. – Laird Nelson Nov 02 '18 at 21:35
  • @LairdNelson : good link, but Brian seems to talk about paralellism in general, and not about using streams, right ? – serv-inc Feb 21 '22 at 10:55
6

One can simple create a few thread and get the result.

Thread t = new Mythread(object);

if (t.done()) {
   // get result
   // add result
}

EDIT : I think other solutions are cooler.

fastcodejava
  • 39,895
  • 28
  • 133
  • 186
2

For a more detailed answer, read Java Concurrency in Practice and use java.util.concurrent.

Adam Goode
  • 7,380
  • 3
  • 29
  • 33
1

Here's something I use in my own projects:

public class ParallelTasks
{
    private final Collection<Runnable> tasks = new ArrayList<Runnable>();

    public ParallelTasks()
    {
    }

    public void add(final Runnable task)
    {
        tasks.add(task);
    }

    public void go() throws InterruptedException
    {
        final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
                .availableProcessors());
        try
        {
            final CountDownLatch latch = new CountDownLatch(tasks.size());
            for (final Runnable task : tasks)
                threads.execute(new Runnable() {
                    public void run()
                    {
                        try
                        {
                            task.run();
                        }
                        finally
                        {
                            latch.countDown();
                        }
                    }
                });
            latch.await();
        }
        finally
        {
            threads.shutdown();
        }
    }
}

// ...

public static void main(final String[] args) throws Exception
{
    ParallelTasks tasks = new ParallelTasks();
    final Runnable waitOneSecond = new Runnable() {
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
            }
        }
    };
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    final long start = System.currentTimeMillis();
    tasks.go();
    System.err.println(System.currentTimeMillis() - start);
}

Which prints a bit over 2000 on my dual-core box.

Jonathan Feinberg
  • 44,698
  • 7
  • 80
  • 103
1

A neat way is to utilize ExecutorCompletionService.

Say you have following code (as in your example):

 public static void main(String[] args) {
    List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
    List<List<Character>> list = new ArrayList<>();

    for (char letter : letters) {
      List<Character> result = computeLettersBefore(letter);
      list.add(result);
    }

    System.out.println(list);
  }

  private static List<Character> computeLettersBefore(char letter) {
    return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
  }

Now to execute the tasks in parallel all you need to do is to create ExecutorCompletionService backed by thread pool. Then submit tasks and read the results. Since ExecutorCompletionService uses LinkedBlockingQueue under the hood, the results become available for pickup as soon as they are available (if you run the code you will notice that the order of results is random):

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(3);
    final ExecutorCompletionService<List<Character>> completionService = new ExecutorCompletionService<>(threadPool);

    final List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
    List<List<Character>> list = new ArrayList<>();

    for (char letter : letters) {
      completionService.submit(() -> computeLettersBefore(letter));
    }

    // NOTE: instead over iterating over letters again number of submitted tasks can be used as a base for loop
    for (char letter : letters) {
      final List<Character> result = completionService.take().get();
      list.add(result);
    }

    threadPool.shutdownNow(); // NOTE: for safety place it inside finally block 

    System.out.println(list);
  }

  private static List<Character> computeLettersBefore(char letter) {
    return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
  }
walkeros
  • 4,736
  • 4
  • 35
  • 47
0

You can use the ThreadPoolExecutor. Here is sample code: http://programmingexamples.wikidot.com/threadpoolexecutor (too long to bring it here)

David Rabinowitz
  • 29,904
  • 14
  • 93
  • 125
0

I to was going to mention an executor class. Here is some example code that you would place in the executor class.

    private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);

    private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();

    public void addCallable(Callable<Object> callable) {
        this.callableList.add(callable);
    }

    public void clearCallables(){
        this.callableList.clear();
    }

    public void executeThreads(){
        try {
        threadLauncher.invokeAll(this.callableList);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public Object[] getResult() {

        List<Future<Object>> resultList = null;
        Object[] resultArray = null;
        try {

            resultList = threadLauncher.invokeAll(this.callableList);

            resultArray = new Object[resultList.size()];

            for (int i = 0; i < resultList.size(); i++) {
                resultArray[i] = resultList.get(i).get();
            }

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return resultArray;
    }

Then to use it you would make calls to the executor class to populate and execute it.

executor.addCallable( some implementation of callable) // do this once for each task 
Object[] results = executor.getResult();
mkamowski
  • 334
  • 2
  • 7
0

Fork/Join's parallel array is one option

Michael Barker
  • 14,153
  • 4
  • 48
  • 55
0

I know it's an old old thread, but since Rxjava (now it's v3) came out, my favorite way to do parallel programming is through its flatMap by the following several lines. (sometimes but not very intuitive at the first sight)

// Assume we're in main thread at the moment
Flowable.create(...) // upstream data provider, main thread
  .map(...) // some transformers?? main thread
  .filter(...) // some filter thread
  .flatMap(data -> Flowable.just(data)
               .subscribeOn(Schedulers.from(...your executorservice for the sub worker.....), true) // true is to delay the error. 
               .doOnNext(this::process)
           , MAX_CONCURRENT) // max number of concurrent workers
  .subscribe();

You can check it's javadoc to understand the operators. Rxjava 3- Flowable A simple example:

Flowable.range(1, 100)
                .map(Object::toString)
                .flatMap (i -> Flowable.just(i)
                        .doOnNext(j -> {
                            System.out.println("Current thread is ");
                            Thread.sleep(100);
                        }).subscribeOn(Schedulers.io()), true, 10)
        
                .subscribe(
                        integer -> log.info("We consumed {}", integer),
                        throwable -> log.error("We met errors", throwable),
                        () -> log.info("The stream completed!!!"));

And for your case:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

We could try:

Flowable.fromIterable(objects)
        .flatMap(obj -> 
                    Flowable.just(compute(obj)).subscribeOn(Schedulers.io()), true, YOUR_CONCURRENCY_NUMBER))
        .doOnNext(res -> list.add(res))
        .subscribe()

Bonus points: if you need to add some ordering, let's say for example, odd number all go to worker1, even number worker2, etc. Rxjava can achieve that easily by groupBy and flatMap operators together. I won't go too details about them here. Enjoy playing :)))

Ying Cherry
  • 143
  • 1
  • 5