14

I have a fixedThreadPool that I am using to run a bunch of worker threads to achieve parallel execution of a task with many components.

When all threads have finished, I retrieve their results (which are quite large) using a method (getResult) and write them to a file.

Ultimately, to save memory and be able to see intermediate results, I'd like each thread to write its result to the file as soon as it finishes execution and then free its memory.

Ordinarily, I'd add code to that effect to the end of the run() method. However, certain other objects in this class also calls these threads, but DO NOT want them to write their results to file - instead they use their results to perform other calculations, which are eventually written to file.

So, I was wondering if it's possible to attach a callback function to the event of a thread finishing using the ExecutorService. That way, I can immediately retrieve its result and free the memory in that scenario, but not break the code when those threads are used in other scenarios.

Is such a thing possible?

Alex
  • 18,332
  • 10
  • 49
  • 53

5 Answers5

8

If using Google Guava is an option, you could utilize the ListenableFuture interface in the following manner:

  1. Convert an ExecutorService to a ListeningExecutorService via MoreExecutors.listeningDecorator(existingExecutorService)
  2. The submit(Callable<V>) method of ListeningExecutorService has been narrowed to return a ListenableFuture, which is a subinterface of Future.
  3. ListenableFuture has an addListener() method so you can register a callback to be run when the future is completed.
Peter
  • 6,354
  • 1
  • 31
  • 29
6

You can add a callback for when a thread returns in Java 8+ using CompletableFuture as in the following, where t is the result of your long-running computation,

CompletableFuture.supplyAsync(() -> {
    T t = new T();
    // do something
    return t;
}).thenApply(t -> {
    // process t
});

If you want to use callbacks in just Java 7, you could do something like,

int x = 10;
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(x);
Future<T> result = fixedThreadPool.submit(() -> {
    // do calculation
    return T;
});
fixedThreadPool.submit(() -> {
    long minutesToWait = 5;
    T t = null;
    try {
        t = result.get(minutesToWait, TimeUnit.MINUTES);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        LOGGER.error(e);
    }
    if (t != null) {
        // process t
    }
});
Max
  • 2,036
  • 2
  • 18
  • 27
  • 1
    Think you've forgot to provide fixedThreadPool to `supplyAsync()`method in first example. – xyman Jul 03 '19 at 10:05
4

ExecutorService#submit return FutureTask<T> which helps you to retrieve result and the ExecutorService#get method will block execution until the computation is not completed. Example -

ExecutorService executor = Executors.newFixedThreadPool(10);
Future<Long> future = executor.submit(new Callable<Long>(){
       @Override
       public Long call() throws Exception {
           long sum = 0;
           for (long i = 0; i <= 10000000l; i++) {
               sum += i;
           }
           return sum;
       }
});
Long result = future.get();
System.out.println(result);
Gray
  • 115,027
  • 24
  • 293
  • 354
Subhrajyoti Majumder
  • 40,646
  • 13
  • 77
  • 103
  • He specifically did not want to change the `run()` method as stated in his question. – Gray Aug 26 '15 at 02:23
  • 3
    The reason you'd submit the task to a separate thread is so you can avoid blocking the current thread for a long computation, and you can use the current thread for other things. Since `future.get()` blocks, it should be called in a task which is also submitted to the `ExecutorService` as well. – Max Jan 24 '18 at 16:58
2

So, I was wondering if it's possible to attach a callback function to the event of a thread finishing using the ExecutorService.

Not directly, no, but there are a couple of ways you could accomplish this. The easiest way that comes to mind is to wrap your Runnable in another Runnable that does the reaping of the results.

So you'd do something like:

threadPool.submit(new ResultPrinter(myRunnable));
...

private static class ResultPrinter implements Runnable {
    private final MyRunnable myRunnable;
    public ResultPrinter(MyRunnable myRunnable) {
        this.myRunnable = myRunnable;
    }
    public void run() {
        myRunnable.run();
        Results results = myRunnable.getResults();
        // print results;
    }
}
Gray
  • 115,027
  • 24
  • 293
  • 354
  • The `myRunnable.getReulst` still will block the executing thread, it is not the pure asynchronous mode with "callback". – JasonMing Aug 21 '15 at 10:36
  • I assumed that the `getResult()` call didn't block and that the `run()` method stored all of the results in your object. Having a `get...` method block isn't the best pattern @JasonMing. – Gray Aug 26 '15 at 02:22
0

Project Loom

Project Loom will hopefully be bringing new features to the concurrency facilities of Java. Experimental builds available now, based on early-access Java 17. The Loom teams is soliciting feedback. For more info, see any of the most recent videos and articles by members of the team such as Ron Pressler or Alan Bateman. Loom has evolved, so study the most recent resources.

One convenient feature of Project Loom is making ExecutorService be AutoCloseable. This means we can use try-with-resources syntax to automatically shutdown an executor service. The flow-of-control blocks at the end of the try block until all the submitted tasks are done/failed/canceled. After that, the executor service is automatically closed. Simplifies our code, and makes obvious by visual code structure our intent to wait for tasks to complete.

Another import feature of Project Loom is virtual threads (a.k.a. fibers). Virtual threads are lightweight in terms of both memory and CPU.

  • Regarding memory, each virtual thread gets a stack that grows and shrinks as needed.
  • Regarding CPU, each of many virtual threads rides on top of any of several platform/kernel threads. This makes blocking is very cheap. When a virtual thread blocks, it is “parked” (set aside) so that another virtual thread may continue to execute on the “real” platform/kernel thread.

Being lightweight means we can have many virtual threads at a time, millions even.

➥ The challenge of your Question is to react immediately when a submitted task is ready to return its result, without waiting for all the other tasks to finish. This is much simpler with Project Loom technology.

Just call get on each Future on yet another thread

Because we have nearly endless numbers of threads, and because blocking is so very cheap, we can submit a task that simply calls Future#get to wait for a result on every Future returned by every Callable we submit to an executor service. The call to get blocks, waiting until the Callable from whence it came has finished its work and returned a result.

Normally, we would want to avoid assigning a Future#get call to a conventional background thread. That thread would halt all further work until the blocked get method returns. But with Project Loom, that blocking call is detected, and its thread is “parked”, so other threads may continue. And when that blocked-call eventually returns, that too is detected by Loom, causing the no-longer-blocked-task’s virtual thread to soon be scheduled for further execution on a “real” thread. All this parking and rescheduling happens rapidly and automatically, with no effort on our part as Java programmers.

To demonstrate, the results of my tasks are stuffed into a concurrent map. To show that this is happening as soon as results are available, I override the put method on the ConcurrentSkipListMap class to do a System.out.println message.

The full example app is shown below. But the 3 key lines are as follows. Notice how we instantiate a Callable that sleeps a few seconds, and then returns the current moment as a Instant object. As we submit each of those Callable objects, we get back a Future object. For each returned Future, we submit another task, a Runnable, to our same executor service that merely calls Future#get, waiting for a result, and eventually posting that result to our results map.

final Callable < Instant > callable = new TimeTeller( nth );
final Future < Instant > future = executorService.submit( callable ); // Submit first task: a `Callable`, an instance of our `TimeTeller` class.
executorService.submit( ( ) -> results.put( nth , future.get() ) );   // Submit second task: a `Runnable` that merely waits for our first task to finish, and put its result into a map.

Caveat: I am no expert on concurrency. But I believe my approach here is sound.

Caveat: Project Loom is still in the experimental stage, and is subject to change in both its API and its behavior.

package work.basil.example.callbacks;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.*;

public class App
{
    public static void main ( String[] args )
    {
        App app = new App();
        app.demo();
    }

    private void demo ( )
    {
        System.out.println( "INFO - Starting `demo` method. " + Instant.now() );
        int limit = 10;
        ConcurrentNavigableMap < Integer, Instant > results = new ConcurrentSkipListMap <>()
        {
            @Override
            public Instant put ( Integer key , Instant value )
            {
                System.out.println( "INFO - Putting key=" + key + " value=" + value + " at " + Instant.now() );
                return super.put( key , value );
            }
        };
        try (
                ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
        )
        {
            for ( int i = 0 ; i < limit ; i++ )
            {
                final Integer nth = Integer.valueOf( i );
                final Callable < Instant > callable = new TimeTeller( nth );
                final Future < Instant > future = executorService.submit( callable ); // Submit first task: a `Callable`, an instance of our `TimeTeller` class.
                executorService.submit( ( ) -> results.put( nth , future.get() ) );   // Submit second task: a `Runnable` that merely waits for our first task to finish, and put its result into a map.
            }
        }
        // At this point flow-of-control blocks until:
        // (a) all submitted tasks are done/failed/canceled, and
        // (b) the executor service is automatically closed.
        System.out.println( "INFO - Ending `demo` method. " + Instant.now() );
        System.out.println( "limit = " + limit + " | count of results: " + results.size() );
        System.out.println( "results = " + results );
    }

    record TimeTeller(Integer id) implements Callable
    {
        @Override
        public Instant call ( ) throws Exception
        {
            // To simulate work that involves blocking, sleep a random number of seconds.
            Duration duration = Duration.ofSeconds( ThreadLocalRandom.current().nextInt( 1 , 55 ) );
            System.out.println( "id = " + id + " ➠ duration = " + duration );
            Thread.sleep( duration );
            return Instant.now();
        }
    }
}

When run.

INFO - Starting `demo` method. 2021-03-07T07:51:03.406847Z
id = 1 ➠ duration = PT27S
id = 2 ➠ duration = PT4S
id = 4 ➠ duration = PT6S
id = 5 ➠ duration = PT16S
id = 6 ➠ duration = PT34S
id = 7 ➠ duration = PT33S
id = 8 ➠ duration = PT52S
id = 9 ➠ duration = PT17S
id = 0 ➠ duration = PT4S
id = 3 ➠ duration = PT41S
INFO - Putting key=2 value=2021-03-07T07:51:07.443580Z at 2021-03-07T07:51:07.444137Z
INFO - Putting key=0 value=2021-03-07T07:51:07.445898Z at 2021-03-07T07:51:07.446173Z
INFO - Putting key=4 value=2021-03-07T07:51:09.446220Z at 2021-03-07T07:51:09.446623Z
INFO - Putting key=5 value=2021-03-07T07:51:19.443060Z at 2021-03-07T07:51:19.443554Z
INFO - Putting key=9 value=2021-03-07T07:51:20.444723Z at 2021-03-07T07:51:20.445132Z
INFO - Putting key=1 value=2021-03-07T07:51:30.443793Z at 2021-03-07T07:51:30.444254Z
INFO - Putting key=7 value=2021-03-07T07:51:36.445371Z at 2021-03-07T07:51:36.445865Z
INFO - Putting key=6 value=2021-03-07T07:51:37.442659Z at 2021-03-07T07:51:37.443087Z
INFO - Putting key=3 value=2021-03-07T07:51:44.449661Z at 2021-03-07T07:51:44.450056Z
INFO - Putting key=8 value=2021-03-07T07:51:55.447298Z at 2021-03-07T07:51:55.447717Z
INFO - Ending `demo` method. 2021-03-07T07:51:55.448194Z
limit = 10 | count of results: 10
results = {0=2021-03-07T07:51:07.445898Z, 1=2021-03-07T07:51:30.443793Z, 2=2021-03-07T07:51:07.443580Z, 3=2021-03-07T07:51:44.449661Z, 4=2021-03-07T07:51:09.446220Z, 5=2021-03-07T07:51:19.443060Z, 6=2021-03-07T07:51:37.442659Z, 7=2021-03-07T07:51:36.445371Z, 8=2021-03-07T07:51:55.447298Z, 9=2021-03-07T07:51:20.444723Z}
Basil Bourque
  • 303,325
  • 100
  • 852
  • 1,154