5

Im looking for a good solution of coordinating several multithreading tasks.

Basically I have 2 tasks, I call A and B that need to be executed on a different thread than the main thread.

But B must be started after A has completed. A and B themselfes contain of several parts who should run parallel, called A1, A2, ... B1, B2, ....

And there is a caller from outside, who needs restart the whole job regardless of the progress. How can I archieve that? I thought of creating some sort of boolean array holding the information if each subtask (A1, ...) has already completed and if so start B. And check every few lines of code in each method if a cancellation has already been made. But it seems to me, that that is not an elegant solution and that there ways to coordinate excatly this.

enter image description here

Paul Woitaschek
  • 6,717
  • 5
  • 33
  • 52
  • 3
    You want ExecutorServices. And you want to wait for Futures. All of which can be found at Oracle's Java Documentation pages. – Fildor Mar 10 '15 at 10:38
  • Do your subtasks (A1, A2...) return some result? If yes, there is useful `java.util.concurrent.FutureTask` to manage such process: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/FutureTask.html. If no, use `CountDownLatch` from the same package. – Alex Salauyou Mar 10 '15 at 10:38
  • http://stackoverflow.com/questions/1361029/waiting-on-multiple-threads-to-complete-in-java – StanislavL Mar 10 '15 at 10:40

3 Answers3

2

In Java8, you can use CompletableFutures. The method execA sets off three parallel tasks and returns a CompletableFuture which consists of all these tasks. execB waits for this composite task to complete and then sets off a set of tasks of its own. Finally, the get in the main method waits for the B methods to complete.

public class Futures {
    String name;
    int value;

    public static void main(String[] args) {
        try {
        execB(execA()).get();
        } catch(InterruptedException|ExecutionException e) {}
    }
    Futures(String name, int value) {
        this.name = name;
        this.value = value;
    }

    void runMethod() {
        System.out.println("Entering " + name);
        try {
            Thread.sleep(value * 1000);
        } catch(InterruptedException e) {}
        System.out.println("Exiting " + name);
    }
    public static CompletableFuture<Void> execA() {
        return(
            CompletableFuture.<Void>allOf(
            CompletableFuture.runAsync(() -> (new Futures("a1", 4)).runMethod()),
            CompletableFuture.runAsync(() -> (new Futures("a2", 2)).runMethod()),
            CompletableFuture.runAsync(() -> (new Futures("a3", 1)).runMethod()))
        );
    }
    public static CompletableFuture<Void> execB(CompletableFuture<Void> prev) {
        try {
            prev.get();
        } catch (InterruptedException|ExecutionException e) {}
        return(
            CompletableFuture.<Void>allOf(
            CompletableFuture.runAsync(() -> (new Futures("b1", 2)).runMethod()),
            CompletableFuture.runAsync(() -> (new Futures("b2", 3)).runMethod()),
            CompletableFuture.runAsync(() -> (new Futures("b3", 1)).runMethod())));
    }
}
Neil Masson
  • 2,609
  • 1
  • 15
  • 23
0

The following is a sample implementation using countDownLatches and Exectors:

public class Test {
    static ExecutorService maintaskExecutor = Executors.newFixedThreadPool(2);

    private static CountDownLatch latch = new CountDownLatch(0);


    public Test() {
    }


    public static void main(String[] args) {
        maintaskExecutor.submit(new runnableA());
        maintaskExecutor.submit(new runnableB());

    }

    private void restart() {
        maintaskExecutor.shutdownNow();
        maintaskExecutor.submit(new runnableA());
        maintaskExecutor.submit(new runnableB());
    }


    private static class runnableA implements Runnable {
        ExecutorService taskExecutorA = Executors.newFixedThreadPool(3);
        private final CountDownLatch latchA = new CountDownLatch(3);


        @Override
        public void run() {

            try {
                Runnable a1Runnable = createA1Runnable();
                Runnable a2Runnable = createA1Runnable();
                Runnable a3Runnable = createA1Runnable();

                taskExecutorA.submit(a1Runnable);
                taskExecutorA.submit(a2Runnable);
                taskExecutorA.submit(a3Runnable);

                latchA.await();
                latch.countDown();
            } catch (InterruptedException e) {
                taskExecutorA.shutdownNow();
            }
        }

        private Runnable createA1Runnable() {
            return new Runnable() {
                @Override
                public void run() {
                    //Design this task to respond to interruption by checking if the thread has been interrupted
                    while(!Thread.interrupted()){
                        //Do the work
                    }

                    return;
                }
            };
        }
    }

      private  static class runnableB implements Runnable{
              private final CountDownLatch latch = new CountDownLatch(3);
              ExecutorService taskExecutorB = Executors.newFixedThreadPool(3);

              public void run(){
                  try {
                    latch.await();
                    //Creates the tasks B1, B2, ...


                } catch (InterruptedException e) {
                    taskExecutorB.shutdownNow();
                }
             }
}

}
javaHunter
  • 1,097
  • 6
  • 9
0

Assuming you need the outputs of the subtasks, you can use thenCombine in CompletableFuture

    CompletableFuture<String> a1 = CompletableFuture.supplyAsync(() -> "a1");
    CompletableFuture<String> a2 = CompletableFuture.supplyAsync(() -> "a2");
    CompletableFuture<String> a3 = CompletableFuture.supplyAsync(() -> "a3");
    CompletableFuture<String> a = a1.thenCombine(a2, (a1r, a2r) -> "combination of a1 and a2").thenCombine(a3,
            (a1anda2r, a3r) -> "combination of a1,a2,a3");

    CompletableFuture<String> b1 = CompletableFuture.supplyAsync(() -> "a1");
    CompletableFuture<String> b2 = CompletableFuture.supplyAsync(() -> "a2");
    CompletableFuture<String> b3 = CompletableFuture.supplyAsync(() -> "a3");
    CompletableFuture<String> b = a.thenCombine(b1, (ar, b1r) -> "combination of a and b1")
            .thenCombine(b2, (aAndb1, b2r) -> "combination of a,b1,b2")
            .thenCombine(b3, (aAndb1Andb2, b3r) -> "combination of A and B");

If the output is not needed, you can use the allOf solution from Neil Masson

Ruben
  • 3,986
  • 1
  • 21
  • 34