190

Say I have a queue full of tasks which I need to submit to an executor service. I want them processed one at a time. The simplest way I can think of is to:

  1. Take a task from the queue
  2. Submit it to the executor
  3. Call .get on the returned Future and block until a result is available
  4. Take another task from the queue...

However, I am trying to avoid blocking completely. If I have 10,000 such queues, which need their tasks processed one at a time, I'll run out of stack space because most of them will be holding on to blocked threads.

What I would like is to submit a task and provide a call-back which is called when the task is complete. I'll use that call-back notification as a flag to send the next task. (functionaljava and jetlang apparently use such non-blocking algorithms, but I can't understand their code)

How can I do that using JDK's java.util.concurrent, short of writing my own executor service?

(the queue which feeds me these tasks may itself block, but that is an issue to be tackled later)

Michael Myers
  • 188,989
  • 46
  • 291
  • 292
Shahbaz
  • 10,395
  • 21
  • 54
  • 83

12 Answers12

174

Define a callback interface to receive whatever parameters you want to pass along in the completion notification. Then invoke it at the end of the task.

You could even write a general wrapper for Runnable tasks, and submit these to ExecutorService. Or, see below for a mechanism built into Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

With CompletableFuture, Java 8 included a more elaborate means to compose pipelines where processes can be completed asynchronously and conditionally. Here's a contrived but complete example of notification.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}
erickson
  • 265,237
  • 58
  • 395
  • 493
  • 1
    Three answers in the blink of an eye! I like the CallbackTask, such a simple and straight forward solution. It looks obvious in retrospect. Thanks. Regarding others comments about SingleThreadedExecutor: I may have thousands of queues which may have thousands of tasks. Each of them need process their tasks one at a time, but different queues can operate in parallel. That's why I am using a single global threadpool. I'm new to executors so please tell me if I am mistaken. – Shahbaz May 05 '09 at 18:47
  • 6
    Good pattern, I would however use [Guava's listenable future API](http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained) that provide a very good implementation of it. – Pierre-Henri Nov 13 '12 at 20:30
  • doesn't this beat the purpose of using Future? – takecare May 07 '15 at 11:19
  • @takecare It's a non-blocking alternative to waiting on `get()`, if that's what you mean. – erickson May 07 '15 at 13:50
  • 2
    @erickson Could you specify, which `Callback` import it is? That would help a lot. There are so many, it's difficult to find. – Zelphir Kaltstahl Jan 28 '16 at 17:01
  • 2
    @Zelphir It was a `Callback` interface that you declare; not from a library. Nowadays I'd probably just use `Runnable`, `Consumer`, or `BiConsumer`, depending on what I need to pass back from the task to the listener. – erickson Jan 29 '16 at 15:52
  • But this callback is executed on the run() thread, not the thread in which the Runnable object was created, how do you get the callback to run on the thread in which the callback was created? – Bhargav Oct 24 '16 at 04:18
  • 1
    @Bhargav This is typical of callbacks—an external entity "calls back" to the controlling entity. Do you want the thread that created the task to block until the task has finished? Then what purpose is there in running the task on a second thread? If you allow the thread to continue, it will need to repeatedly check some shared state (probably in a loop, but depends on your program) until it notices an update (boolean flag, new item in queue, etc.) made by the true callback as described in this answer. It can then perform some additional work. – erickson Oct 24 '16 at 14:53
  • 1
    @erickson The first solution with a callback is just beautiful. I needed this since I'm using an extended `ThreadPoolExecutor` class which also supports priorities for tasks. Since `CompletableFuture` uses the `Supplier` interface as parameter instead of the `Runnable, V` or `Callable` ones, I was wondering how I could achieve this. I used a `Consumer` interface, though I extended it for additional methods. Absolutely works like a charm. – CRoemheld Jan 20 '18 at 18:59
64

In Java 8 you can use CompletableFuture. Here's an example I had in my code where I'm using it to fetch users from my user service, map them to my view objects and then update my view or show an error dialog (this is a GUI application):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

It executes asynchronously. I'm using two private methods: mapUsersToUserViews and updateView.

beat
  • 1,857
  • 1
  • 22
  • 36
Matt
  • 5,328
  • 2
  • 30
  • 43
  • 1
    How would one use a CompletableFuture with an executor ? (to limit the number of concurent/parallel instances) Would this be a hint: cf: [submitting-futuretasks-to-an-executor-why-does-it-work](https://stackoverflow.com/questions/1710632/submitting-futuretasks-to-an-executor-why-does-it-work) ? – user1767316 Jul 15 '20 at 10:21
54

Use Guava's listenable future API and add a callback. Cf. from the website :

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});
naXa stands with Ukraine
  • 35,493
  • 19
  • 190
  • 259
Pierre-Henri
  • 1,495
  • 10
  • 14
26

You could extend FutureTask class, and override the done() method, then add the FutureTask object to the ExecutorService, so the done() method will invoke when the FutureTask completed immediately.

Machavity
  • 30,841
  • 27
  • 92
  • 100
Auguste
  • 541
  • 6
  • 10
  • `then add the FutureTask object to the ExecutorService`, could you please tell me how to do this ? – Gary Gauh Jun 18 '17 at 13:22
  • @GaryGauh [see this for more info](https://stackoverflow.com/questions/1710632/submitting-futuretasks-to-an-executor-why-does-it-work) you can extends FutureTask, we may call it MyFutureTask. Then use ExcutorService to submit MyFutureTask,then the run method of MyFutureTask will run,when MyFutureTask finished your done method will be called.Here something confusing is two FutureTask,and in fact MyFutureTask is a normal Runnable. – lin Jul 20 '17 at 06:35
17

ThreadPoolExecutor also has beforeExecute and afterExecute hook methods that you can override and make use of. Here is the description from ThreadPoolExecutor's Javadocs.

Hook methods

This class provides protected overridable beforeExecute(java.lang.Thread, java.lang.Runnable) and afterExecute(java.lang.Runnable, java.lang.Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated() can be overridden to perform any special processing that needs to be done once the Executor has fully terminated. If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.

Michael Myers
  • 188,989
  • 46
  • 291
  • 292
Cem Catikkas
  • 7,171
  • 4
  • 29
  • 33
6

Use a CountDownLatch.

It's from java.util.concurrent and it's exactly the way to wait for several threads to complete execution before continuing.

In order to achieve the callback effect you're looking after, that does require a little additional extra work. Namely, handling this by yourself in a separate thread which uses the CountDownLatch and does wait on it, then goes on about notifying whatever it is you need to notify. There is no native support for callback, or anything similar to that effect.


EDIT: now that I further understand your question, I think you are reaching too far, unnecessarily. If you take a regular SingleThreadExecutor, give it all the tasks, and it will do the queueing natively.

Yuval Adam
  • 161,610
  • 92
  • 305
  • 395
  • Using SingleThreadExecutor what is the best way to know that all the threads have completed? I saw an examplethat uses a while !executor.isTerminated but this doesn't seem very elegant. I implemented a callback feature for each worker and increment a count which works. – Bear Jul 21 '14 at 14:02
5

If you want to make sure that no tasks will run at the same time then use a SingleThreadedExecutor. The tasks will be processed in the order the are submitted. You don't even need to hold the tasks, just submit them to the exec.

basszero
  • 29,624
  • 9
  • 57
  • 79
3

Simple code to implement Callback mechanism using ExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

output:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Key notes:

  1. If you want process tasks in sequence in FIFO order, replace newFixedThreadPool(5) with newFixedThreadPool(1)
  2. If you want to process next task after analysing the result from callback of previous task,just un-comment below line

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. You can replace newFixedThreadPool() with one of

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    depending on your use case.

  4. If you want to handle callback method asynchronously

    a. Pass a shared ExecutorService or ThreadPoolExecutor to Callable task

    b. Convert your Callable method to Callable/Runnable task

    c. Push callback task to ExecutorService or ThreadPoolExecutor

Ravindra babu
  • 37,698
  • 11
  • 250
  • 211
2

This is an extension to Pache's answer using Guava's ListenableFuture.

In particular, Futures.transform() returns ListenableFuture so can be used to chain async calls. Futures.addCallback() returns void, so cannot be used for chaining, but is good for handling success/failure on an async completion.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

NOTE: In addition to chaining async tasks, Futures.transform() also allows you to schedule each task on a separate executor (Not shown in this example).

bcorso
  • 45,608
  • 10
  • 63
  • 75
1

Just to add to Matt's answer, which helped, here is a more fleshed-out example to show the use of a callback.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

The output is:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito
Old Jack
  • 21
  • 2
1

You may use a implementation of Callable such that

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

where CallbackInterface is something very basic like

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

and now the main class will look like this

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
Deepika Anand
  • 305
  • 3
  • 6
0

You can implement future tasks in java executors that returns the callback on the completion of the tasks.

  • Use callable for tasks instead of runnable
  • Can perform non related tasks when execution of threads are in progress asynchronously
  • Multiple tasks can be executed and can get result of each one

Follwoing is the class that returns random integer values from a task and printed on main thread

public class ExecutorCallable {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //fix number of threads
        //blocking queue-thread safe
        ExecutorService service = Executors.newFixedThreadPool(100);

        //submit the tasks for execution
        Future<Integer> result = service.submit(new Task());

        //*** perform some unrelated operations

        System.out.println("Result of submitted task is : " + result.get());//blocks until the future is ready after completion

        System.out.println("Thread name " + Thread.currentThread().getName());
    }

    static class Task implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            return new Random().nextInt();
        }
    }
 }

The Output is:

Result of submitted task is : 16645418
Thread name main