16

I need to find a way to execute tasks (dependent and independent) in parallel in java.

  1. Task A and Task C can run independently.
  2. Task B is dependent on the output of Task A.

I checked java.util.concurrent Future and Fork/Join, but looks like we cannot add dependency to a Task.

Can anyone point me to correct Java API.

Steve McLeod
  • 51,737
  • 47
  • 128
  • 184
rocky
  • 243
  • 4
  • 9
  • Have you considered having Task A notify Task B when it is complete? Before you start Task A, instantiate Task B and add it as an observer to Task A (See Observer Pattern). – David W Jun 01 '12 at 17:55
  • Guava's [`ListenableFuture`](http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained) is a bit friendlier about these things than plain Futures. – Louis Wasserman Jun 01 '12 at 21:40

8 Answers8

13

In Scala this is very easy to do, and I think you are better off using Scala. Here is an example I pulled from here http://danielwestheide.com/ (The Neophyte’s Guide to Scala Part 16: Where to Go From Here) this guy has a great blog (I am not that guy)

Lets take a barrista making coffee. The tasks to do are:

  1. Grind the required coffee beans (no preceding tasks)
  2. Heat some water (no preceding tasks)
  3. Brew an espresso using the ground coffee and the heated water (depends on 1 & 2)
  4. Froth some milk (no preceding tasks)
  5. Combine the froth milk and the espresso (depends on 3,4)

or as a tree:

Grind   _
Coffe    \
          \   
Heat    ___\_Brew____ 
Water                \_____Combine
                     /
Foam    ____________/
Milk

In java using the concurrency api this would be:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Barrista {

    static class HeatWater implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Heating Water");
            Thread.sleep(1000);
            return "hot water";
        }
    }

    static class GrindBeans implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Grinding Beans");
            Thread.sleep(2000);
            return "grinded beans";
        }
    }

    static class Brew implements Callable<String> {

        final Future<String> grindedBeans;
        final Future<String> hotWater;

        public Brew(Future<String> grindedBeans, Future<String> hotWater) {
            this.grindedBeans = grindedBeans;
            this.hotWater = hotWater;
        }

        @Override
        public String call() throws Exception
        {
            System.out.println("brewing coffee with " + grindedBeans.get()
                    + " and " + hotWater.get());
            Thread.sleep(1000);
            return "brewed coffee";
        }
    }

    static class FrothMilk implements Callable<String> {

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "some milk";
        }
    }

    static class Combine implements Callable<String> {

        public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
            super();
            this.frothedMilk = frothedMilk;
            this.brewedCoffee = brewedCoffee;
        }

        final Future<String> frothedMilk;
        final Future<String> brewedCoffee;

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            System.out.println("Combining " + frothedMilk.get() + " "
                    + brewedCoffee.get());
            return "Final Coffee";
        }

    }

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(2);

        FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
        FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
        FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
        FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
        FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));

        executor.execute(heatWaterFuture);
        executor.execute(grindBeans);
        executor.execute(brewCoffee);
        executor.execute(frothMilk);
        executor.execute(combineCoffee);


        try {

            /**
             *  Warning this code is blocking !!!!!!!
             */         
            System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            System.out.println("20 SECONDS FOR A COFFEE !!!! I am !@#! leaving!!");
            e.printStackTrace();
        } finally{
                executor.shutdown();
            }
        }
    }

Make sure that you add time outs though to ensure that your code will not wait forever on something to complete, that is done by using the Future.get(long, TimeUnit) and then handle failure accordingly.

It is much nicer in scala however, here it is like it's on the blog: The code to prepare some coffee would look something like this:

def prepareCappuccino(): Try[Cappuccino] = for {
  ground <- Try(grind("arabica beans"))
  water <- Try(heatWater(Water(25)))
  espresso <- Try(brew(ground, water))
  foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)

where all the methods return a future (typed future), for instance grind would be something like this:

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
   // grinding function contents
}

For all the implementations check out the blog but that's all there is to it. You can integrate Scala and Java easily as well. I really recommend doing this sort of thing in Scala instead of Java. Scala requires much less code, much cleaner and event driven.

Derrops
  • 7,651
  • 5
  • 30
  • 60
4

General programming model for tasks with dependencies is Dataflow. Simplified model where each task has only one, though repeating, dependency is Actor model. There are many actor libraries for Java, but very few for dataflow. See also: which-actor-model-library-framework-for-java, java-pattern-for-nested-callbacks

Community
  • 1
  • 1
Alexei Kaigorodov
  • 13,189
  • 1
  • 21
  • 38
1

Use a BlockingQueue. Put the output of task A into the queue, and task B blocks until something is available in the queue.

The docs contain example code to achieve this: http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html

Steve McLeod
  • 51,737
  • 47
  • 128
  • 184
1

Java defines a class CompletableFuture.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

This is what you are looking for. It helps to build execution flows.

0

What you need is a CountDownLatch.

final CountDownLatch gate = new CountDownLatch(2);
// thread a
new Thread() {
    public void run() {
        // process
        gate.countDown();
    }
}.start();

// thread c
new Thread() {
    public void run() {
        // process
        gate.countDown();
    }
}.start();

new Thread() {
    public void run() {
        try {
            gate.await();
            // both thread a and thread c have completed
            // process thread b
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}.start();

As an alternative, depending on your scenario, you might also be able to use a BlockingQueue to implement the Producer-Consumer pattern. See the example on the documentation page.

Jeshurun
  • 22,940
  • 6
  • 79
  • 92
  • A `CountDownLatch` is overkill here, and according to the OP, task B is dependent on only task A, not both task A & C. That said, the -1 is simply for not handling the `InterruptedException` properly in snippet. – Tim Bender Jun 01 '12 at 18:18
  • 1
    Thanks, the idea for the code snippet was to show him how the CountDownLatch worked, not to show him how to handle exceptions properly. – Jeshurun Jun 01 '12 at 18:20
0

If task B is dependent on task A's output, I would first question whether or not task B really is a separate task. Separating the tasks would make sense if there is:

  • Some non-trivial amount of work that task B can do before needing task A's results
  • Task B is a long ongoing process that handles output from many different instances of task A
  • There is some other tasks (say D) that also use task A's results

Assuming it is a separate task, then you can allow task A & B to share a BlockingQueue such that task A can pass task B data.

Tim Bender
  • 20,112
  • 2
  • 49
  • 58
0

Use this library https://github.com/familysyan/TaskOrchestration. It manages the task dependency for you.

Edmond
  • 614
  • 2
  • 11
  • 26
0

There is a java library specifically for this purpose (Disclaimer : I am the owner of this library) called Dexecutor

Here is how you can achieve the desired result, you can read more about it here

@Test
public void testDependentTaskExecution() {

    DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();

    executor.addDependency("A", "B");
    executor.addIndependent("C");

    executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);

}

private DefaultDependentTasksExecutor<String, String> newTaskExecutor() {
    return new DefaultDependentTasksExecutor<String, String>(newExecutor(), new SleepyTaskProvider());
}

private ExecutorService newExecutor() {
    return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
}

private static class SleepyTaskProvider implements TaskProvider<String, String> {

    public Task<String, String> provid(final String id) {

        return new Task<String, String>() {

            @Override
            public String execute() {
                try {
                    //Perform some task
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String result = id + "processed";
                return result;
            }

            @Override
            public boolean shouldExecute(ExecutionResults<String, String> parentResults) {
                ExecutionResult<String, String> firstParentResult = parentResults.getFirst();
                //Do some logic with parent result
                if ("B".equals(id) && firstParentResult.isSkipped()) {
                    return false;
                }
                return true;
            }
        };          
    }

}
craftsmannadeem
  • 2,665
  • 26
  • 22