5

I have 2 methods that have different return types that I want to run at the same time. Here is my code:

public void method(int id) {
    final CompletableFuture<List<FooA>> fooACF = CompletableFuture.supplyAsync(() -> generateFooA(id));
    final CompletableFuture<List<FooB>> fooBCF = CompletableFuture.supplyAsync(() -> generateFooB(id));
    List<FooA> fooAs = fooACF.get();
    List<FooB> fooBs = fooBCF.get();
    //Do more processesing
}

public List<FooA> generateFooA(int id) {
    //code
}

public List<FooB> generateFooB(int id) {
    //code
}

But I don't know if both methods will run in parallel with the above code or if I'm just better off saying:

List<FooA> fooAs = generateFooA(id);
List<FooB> fooBs = generateFooB(id);

How do I use completeable futures properly to be able to run both methods in parallel?

Richard
  • 5,840
  • 36
  • 123
  • 208
  • Check out this post https://stackoverflow.com/questions/3376586/how-to-start-two-threads-at-exactly-the-same-time – Matt Nov 20 '17 at 21:50
  • 4
    Your code looks fine. Do you have some specific concern? – shmosel Nov 20 '17 at 21:56
  • 4
    If you're asking whether what you did will run those 2 methods on 2 separate threads then yes you used `CompletableFuture` properly. – Oleg Nov 20 '17 at 21:56

4 Answers4

17

Your code works fine, using threads supplied by the ForkJoinPool.commonPool(), as promised by the JavaDoc for CompletableFuture.supplyAsync(Supplier<U> supplier). You can prove it in a quick-and-dirty manner by adding some sleep() and println() statements. I've simplified your code a bit by using String instead of List<Foo>:

public void method(int id) throws InterruptedException, ExecutionException {
    CompletableFuture<String> cfa = CompletableFuture.supplyAsync(() -> generateA(id));
    CompletableFuture<String> cfb = CompletableFuture.supplyAsync(() -> generateB(id));
    String fooA = cfa.get();
    String fooB = cfb.get();
    System.out.println("Final fooA " + fooA);
    System.out.println("Final fooB " + fooB);
}

public String generateA(int id) {
    System.out.println("Entering generateA " + Thread.currentThread());
    sleep(2000);
    System.out.println("Leaving generateA");
    return "A" + id;
}

public String generateB(int id) {
    System.out.println("Entering generateB " + Thread.currentThread());
    sleep(1000);
    System.out.println("Leaving generateB");
    return "B" + id;
}

private void sleep(int n) {
    try {
        Thread.sleep(n);
    } catch (InterruptedException ex) {
        // never mind
    }
}

Output is:

Entering generateFooA Thread[ForkJoinPool.commonPool-worker-1,5,main]
Entering generateFooB Thread[ForkJoinPool.commonPool-worker-2,5,main]
Leaving generateFooB
Leaving generateFooA
Final fooA A1
Final fooB B1

You can manually observe that the "Leaving" output lines appear after 1 second and 2 seconds. For more evidence you could add timestamps to the output. If you change the relative lengths of the sleeps, you'll see the "Leaving" output appear in a different order.


If you omit the sleep()s, then it's entirely likely that the first thread will complete so quickly that it's finished before the second starts:

Entering generateA Thread[ForkJoinPool.commonPool-worker-1,5,main]
Leaving generateA
Entering generateB Thread[ForkJoinPool.commonPool-worker-1,5,main]
Leaving generateB
Final fooA A1
Final fooB B1

Notice that it's all happened so quickly, that the thread has been returned to the pool by the time the runtime asks for the second thread. So the original thread is reused.

This might conceivably also happen for a very short sleep, although on my system a 1ms sleep was enough every time I ran it. Of course the sleep() is a placeholder for a "real" operation that takes time to complete. If your real operation is so cheap that it finishes before the other thread starts, that's a good hint that this is a scenario in which multi-threading is not beneficial.


However if you need to ask how to prove that things are happening concurrently, I wonder why you want them to happen concurrently in the first place. If there's no "real world" observable difference between your program when it's doing these tasks concurrently, or sequentially, then why not leave it running sequentially? It's easier to reason about sequential operations; there are lots of sneaky bugs associated with concurrency.

Perhaps you're hoping for a speed increase by multi-threading -- if so the increase in speed should be what you're measuring, not whether or not things are actually concurrent. And bear in mind that for an awful lot of tasks, a CPU can't perform them faster in parallel than in sequence.

slim
  • 40,215
  • 13
  • 94
  • 127
0

You can do it with Java 8+ with stream api.
For example we have a calculation flow like this :

result = (a + b) + (a - c) + (c * b) .

So we split this calculation with several methods like this :

public class Calculator {

    public static int add(int a, int b) {
        sleep(); //Imagine this calculation take several seconds
        return a + b;
    }

    public static int minus(int a, int b) {
        sleep(); //Imagine this calculation take several seconds
        return a - b;
    }

    public static int divide(int a, int b) {
        sleep(); //Imagine this calculation take several seconds
        return a * b;
    }

    private static void sleep() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}     

Note: You can see that for each method, 2 seconds have been calculated as the duration of the process.
Lagacy code style :

  int a = 12;
  int b = 4;
  int c = 1;
  LocalTime startDateTime = LocalTime.now();
  int legacyCalculate = Calculator.add(a, b) + Calculator.minus(a, c) + Calculator.divide(c, b);
  System.out.println("Result : " + legacyCalculate);
  LocalTime endDateTime = LocalTime.now();

  System.out.println("Process time : " + startDateTime.until(endDateTime, ChronoUnit.SECONDS) + " seconds");

  // Result : 
  Result : 31
  Process time : 6 seconds

Parallel computation with (ExecutorService or Thread):

ExecutorService es = Executors.newCachedThreadPool();   
LocalTime startDateTime = LocalTime.now();
Future<Integer> r1 = es.submit(() -> Calculator.add(a, b));
Future<Integer> r2 = es.submit(() -> Calculator.minus(a, c));
Future<Integer> r3 = es.submit(() -> Calculator.divide(c, b));
System.out.println("Result : " + (r1.get() + r2.get() + r3.get()));
LocalTime endDateTime = LocalTime.now();
System.out.println("Process time : " + startDateTime.until(endDateTime, ChronoUnit.SECONDS) + " seconds");
es.shutdown();     

// Result: 
Result : 31
Process time : 2 seconds      

Parallel Computation with Java8+ stream:

Supplier<Integer> r1 = () -> Calculator.add(a, b);
Supplier<Integer> r2 = () -> Calculator.minus(a, c);
Supplier<Integer> r3 = () -> Calculator.divide(c, b);

LocalTime startDateTime = LocalTime.now();
int result = Stream.of(r1, r2, r3) 
        .parallel() // Please pay attention to this line
        .mapToInt(Supplier::get) 
        .sum();
LocalTime endDateTime = LocalTime.now();
System.out.println("Result : " + result);
System.out.println("Process time : " + startDateTime.until(endDateTime, ChronoUnit.SECONDS) + " seconds");

// Result: 
Result : 31
Process time : 2 seconds  
mah454
  • 1,571
  • 15
  • 38
-1

As I said in my comment check out How to start two threads at "exactly" the same time but this should be what your looking for

final CyclicBarrier gate = new CyclicBarrier(3);
public void method(int id) {
    Thread one = new Thread (()->{
        gate.await();
        List<FooA> fooAs = generateFooA(id);
    });
    Thread two = new Thread (()->{
        gate.await();
        List<FooB> fooBs = generateFooB(id);
    });
    one.start();
    two.start();
    gate.await();
    //Do more processesing
}

public List<FooA> generateFooA(int id) {
    //code
}

public List<FooB> generateFooB(int id) {
    //code
}
Matt
  • 3,052
  • 1
  • 17
  • 30
  • 5
    Staring 2 additional threads and using `CompletableFuture` makes no sense at all. – Oleg Nov 20 '17 at 22:05
  • The `generateFooA(id)` call and the `generateFooB(id)` call potentially could be well under way, running in different threads, before either your `one` thread or your `two` thread gets to the `gate.await()` call. – Solomon Slow Nov 20 '17 at 22:34
  • 1
    You seem to think that calling `get()` was starting the tasks, but the tasks are scheduled right within `supplyAsync`. They will run to completion even if no-one is ever calling `get()` or `join()`. Besides that, running “exactly at the same time” is not a useful goal at all. Even if you manage to start two tasks at exactly the same time, they could be out of sync right within the next nanosecond. There are no guarantees about execution time or thread scheduling. – Holger Nov 21 '17 at 09:30
  • How are you going to access `fooAs` and `fooBs ` and do more processing with them? – Oleg Nov 21 '17 at 17:29
-1

You're missing an Executor:

ExecutorService executor = Executors.newCachedThreadPool();
List<Future<?>> = Stream.<Runnable>of(() -> generateFooA(id), () -> generateFooA(id))
        .map(executor::submit)
        .collect(Collectors.toList());
for (Future<?> future : futures) {
    future.get(); // do whatever you need here
}

The Runnables start executing when you submit them. The get() returns as soon as it can. eg if the first future you get() is the slowest, all other get() calls will return immediately.

Bohemian
  • 412,405
  • 93
  • 575
  • 722
  • 6
    Re, "You're missing an `Executor`." The `Executor` is implicit in the OP's example. The example submits tasks to the `ForkJoinPool.commonPool()`. – Solomon Slow Nov 20 '17 at 22:26
  • @James that's not the same thing. An Executor tries its best to parallelise execution and this code has exclusive use it, so parallel execution is "likely". But the ForkJoinPool.commonPool() is shared among all streams and so parallel execution is less likely (but not impossible or course). – Bohemian Nov 20 '17 at 22:36
  • 3
    Both approaches “try their best” to parallelize execution, however, you’ll never have a guaranty of parallel execution, even if you manage to create a distinct thread for each task. But even that is not guaranteed by using a thread pool executor. When the first worker thread manages to complete the first task before the second is submitted, it will pick up the second task (not that it matters then). If F/J’s common pool has no free capacity due to other tasks, it implies a workload occupying all cpu cores, so it *is* possible that the submitting thread does not run between the two tasks… – Holger Nov 21 '17 at 09:26
  • @Holger of course, but I'm talking about the situation where threads in the common pool are busy executing long-running tasks (which would be a poor choice, but I've seen it happen) and all streams in the JVM have to wait until they finish. If you have a separate executor, although its threads still have to compete for scheduling, they won't be queued up behind other tasks. I know this is an edge case, but the question asked for parallel execution. Also, if OP's tasks are long running, you wouldn't want them executing in the common pool because you'll have liveliness problems everywhere else. – Bohemian Nov 21 '17 at 18:55
  • There is no problem in having potentially short-running tasks waiting for the completion of long running tasks, in fact, the total execution time might be even shorter than for interleaved execution. As long as all CPU cores are processing tasks, it isn’t a liveliness problem, as long as you’re only talking about execution times. But if you’re talking about priorities, interactive tasks or I/O operations, just using another thread pool isn’t sufficient to solve the problem. Mind that for threads of equal priority, the specification doesn’t make a guaranty for preemptive task switching at all… – Holger Nov 22 '17 at 07:18
  • @Holger You're still missing my point: I've actually had serious liveliness problems caused by processing a List of long-running (http call) tasks using a parallel stream. The rest of the server (that used streams) ground to a halt. – Bohemian Nov 22 '17 at 07:26
  • I assume, an http call *is* an I/O operation, isn’t it?, so, as said, it’s a different issue than just being *long-running*. Doing I/O in the F/J pool is strongly discouraged and if unavoidable, should be done with a [`ManagedBlocker`](https://docs.oracle.com/javase/8/docs/api/?java/util/concurrent/ForkJoinPool.ManagedBlocker.html) to allow spawning a compensation thread if a worker is blocked. For the Stream API, whose use of F/J is an implementation detail, I/O is just discouraged. So I didn’t miss your point, I actually identified your point, even when you were saying “long running” only… – Holger Nov 22 '17 at 07:33