12

What is the proper way to implement concurrency in Java applications? I know about Threads and stuff, of course, I have been programming for Java for 10 years now, but haven't had too much experience with concurrency.

For example, I have to asynchronously load a few resources, and only after all have been loaded, can I proceed and do more work. Needless to say, there is no order how they will finish. How do I do this?

In JavaScript, I like using the jQuery.deferred infrastructure, to say

$.when(deferred1,deferred2,deferred3...)
 .done(
   function(){//here everything is done
    ...
   });

But what do I do in Java?

jotik
  • 17,044
  • 13
  • 58
  • 123
f.khantsis
  • 3,256
  • 5
  • 50
  • 67

8 Answers8

9

I would use parallel stream.

Stream.of(runnable1, runnable2, runnable3).parallel().forEach(r -> r.run());
// do something after all these are done.

If you need this to be asynchronous, then you might use a pool or Thread.

I have to asynchronously load a few resources,

You could collect these resources like this.

List<String> urls = ....

Map<String, String> map = urls.parallelStream()
                              .collect(Collectors.toMap(u -> u, u -> download(u)));

This will give you a mapping of all the resources once they have been downloaded concurrently. The concurrency will be the number of CPUs you have by default.

Peter Lawrey
  • 525,659
  • 79
  • 751
  • 1,130
  • @RobinJonsson forEach doesn't return until all the tasks have completed. There is no support for running stream code asynchronously which is why you would need to do something else if you need this. – Peter Lawrey Apr 15 '16 at 11:07
  • 1
    You don't always have runnable, and you don't always have control over async. For example, I am calling an external method called `getMapAsync(OnMapReadyCallback).` which returns immediately, and starts it's own thread. There are other things I need to load in a similar manner, and perform code after everything is done. I think the best solution I found is the `CountDownLatch` is the correct solution here. – f.khantsis Apr 15 '16 at 11:44
8

You can achieve it in multiple ways.

1.ExecutorService invokeAll() API

Executes the given tasks, returning a list of Futures holding their status and results when all complete.

2.CountDownLatch

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

3.ForkJoinPool or newWorkStealingPool() in Executors is other way

Have a look at related SE questions:

How to wait for a thread that spawns it's own thread?

Executors: How to synchronously wait until all tasks have finished if tasks are created recursively?

Community
  • 1
  • 1
Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
  • Great answer. I'd also add CyclicBarrier — it has a constructor, which accepts a Runnable, that will be executed when all tasks are finished: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CyclicBarrier.html#CyclicBarrier-int-java.lang.Runnable- – Miha_x64 Aug 22 '17 at 20:51
  • Which is the correct way to do the same thing, but without blocking any threads? – Miha_x64 Aug 22 '17 at 20:52
4

If I'm not using parallel Streams or Spring MVC's TaskExecutor, I usually use CountDownLatch. Instantiate with # of tasks, reduce once for each thread that completes its task. CountDownLatch.await() waits until the latch is at 0. Really useful.

Read more here: JavaDocs

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
Robin Jonsson
  • 2,761
  • 3
  • 22
  • 42
  • 1
    What's the difference between this and Semaphore? – f.khantsis Apr 15 '16 at 11:35
  • 1
    @doom777 CountdownLatch is used to start a series of threads and then wait until all of them are complete (or until they call countDown() a given number of times). Semaphore is used to control the number of concurrent threads that are using a resource. That resource can be something like a file, or could be the cpu by limiting the number of threads executing. The count on a Semaphore can go up and down as different threads call acquire() and release(). – Robin Jonsson Apr 15 '16 at 12:40
  • 1
    Yes, but isn't `CountdownLatch` just a Semaphore that can't go up? – f.khantsis Apr 15 '16 at 12:50
  • 1
    It has more usage than that. It can basically function as a simple executor-pool. Using it for the sole purpose of counting in "one direction" (be it up _or_ down) seems a bit pointless to me. – Robin Jonsson Apr 15 '16 at 20:25
3

Personally, I would do something like this if I am using Java 8 or later.

// Retrieving instagram followers
CompletableFuture<Integer> instagramFollowers = CompletableFuture.supplyAsync(() -> {
    // getInstaFollowers(userId);

    return 0; // default value
});

// Retrieving twitter followers
CompletableFuture<Integer> twitterFollowers = CompletableFuture.supplyAsync(() -> {
    // getTwFollowers(userId);

    return 0; // default value
});

System.out.println("Calculating Total Followers...");
CompletableFuture<Integer> totalFollowers = instagramFollowers
    .thenCombine(twitterFollowers, (instaFollowers, twFollowers) -> {
         return instaFollowers + twFollowers; // can be replaced with method reference
});

System.out.println("Total followers: " + totalFollowers.get()); // blocks until both the above tasks are complete

I used supplyAsync() as I am returning some value (no. of followers in this case) from the tasks otherwise I could have used runAsync(). Both of these run the task in a separate thread.

Finally, I used thenCombine() to join both the CompletableFuture. You could also use thenCompose() to join two CompletableFuture if one depends on the other. But in this case, as both the tasks can be executed in parallel, I used thenCombine().

The methods getInstaFollowers(userId) and getTwFollowers(userId) are simple HTTP calls or something.

Ram Patra
  • 16,266
  • 13
  • 66
  • 81
1

You can use a ThreadPool and Executors to do this.

https://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html

NeoP5
  • 611
  • 7
  • 19
1

This is an example I use Threads. Its a static executerService with a fixed size of 50 threads.

public class ThreadPoolExecutor {

private static final ExecutorService executorService = Executors.newFixedThreadPool(50,
        new ThreadFactoryBuilder().setNameFormat("thread-%d").build());

private static ThreadPoolExecutor instance = new ThreadPoolExecutor();

public static ThreadPoolExecutor getInstance() {
    return instance;
}

public <T> Future<? extends T> queueJob(Callable<? extends T> task) {
    return executorService.submit(task);
}

public void shutdown() {
    executorService.shutdown();
}
}

The business logic for the executer is used like this: (You can use Callable or Runnable. Callable can return something, Runnable not)

public class MultipleExecutor implements Callable<ReturnType> {//your code}

And the call of the executer:

ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutor.getInstance();

List<Future<? extends ReturnType>> results = new LinkedList<>();

for (Type Type : typeList) {
            Future<? extends ReturnType> future = threadPoolExecutor.queueJob(
                    new MultipleExecutor(needed parameters));
            results.add(future);
        }

        for (Future<? extends ReturnType> result : results) {
            try {
                if (result.get() != null) {
                    result.get(); // here you get the return of one thread
                }
            } catch (InterruptedException | ExecutionException e) {
                logger.error(e, e);
            }
        }
Patrick
  • 12,336
  • 15
  • 73
  • 115
1

The same behaviour as with $.Deferred in jQuery you can archive in Java 8 with a class called CompletableFuture. This class provides the API for working with Promises. In order to create async code you can use one of it's static creational methods like #runAsync, #supplyAsync. Then applying some computation of results with #thenApply.

hahn
  • 3,588
  • 20
  • 31
0

I usually opt for an async notify-start, notify-progress, notify-end approach:

class Task extends Thread {
    private ThreadLauncher parent;

    public Task(ThreadLauncher parent) {
        super();
        this.parent = parent;
    }

    public void run() {
        doStuff();

        parent.notifyEnd(this);
    }

    public /*abstract*/ void doStuff() {
        // ...
    }
}


class ThreadLauncher {

    public void stuff() {
        for (int i=0; i<10; i++)
            new Task(this).start(); 
    }

    public void notifyEnd(Task who) {
        // ...
    }
}
Exceptyon
  • 1,584
  • 16
  • 23