6

In my project I frequently work with concurrent tasks using Java Futures. In one application, each concurrent task is requiring quite big chunk of memory during its completion. Due to some other design choices, that memory is created and referenced in an object created outside the thread (see more detailed example below).

To my surprise, the future holds a reference to this object, even after the future task (i.e., its calculation thread) has been completed. That is: if no other reference to this object is held elsewhere the object will not be freed, unless the future is freed - even though the task has completed.

My naive thinking was that limiting the number of concurrent threads will automatically limit the number of resources (memory) held by the tasks. THIS IS NOT TRUE!

Consider the code below. In this example I create some task. During their calculation an ArrayList (which is an outer variable) grows in size. The method returns a Vector<Future>. Even if the task has completed and even if the scope of the ArrayList has been left, the Future still holds a reference to the ArrayList (via FutureTask.sync.callable).

To summarize:

  • The FutureTask holds a reference to the Callable, even if the Callable has completed.
  • The Callable holds references to the final outer variables used during calculation, even if the calculation has completed.

Question: What is the best way to free resources held via the Future? (Of course, I know that local variables of the callable are released upon thread completion - this is not what I am asking for).

/*
 * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christianfries.com.
 *
 * Created on 17.08.2013
 */

package net.finmath.experiments.concurrency;

import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @author Christian Fries
 *
 */
public class ConcurrencyTest {

    private ExecutorService executor = Executors.newFixedThreadPool(10);
    private int numberOfDoubles = 1024*1024/8;      // 1 MB
    private int numberOfModels  = 100;              // 100 * 1 MB

    /**
     * @param args
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ConcurrencyTest ct = new ConcurrencyTest();
        ct.concurrencyTest();
    }

    /**
     * @throws ExecutionException 
     * @throws InterruptedException 
     */
    public void concurrencyTest() throws InterruptedException, ExecutionException {
        Vector<Double> results = getResults();

        Runtime.getRuntime().gc();
        System.out.println("Allocated memory (only results): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));
    }


    private Vector<Double> getResults() throws InterruptedException, ExecutionException {
        Vector<Future<Double>> resultsFutures = getResultsConcurrently();


        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.HOURS);

        /*
         * At this point, we expect that no reference to the models is held
         * and the memory is freed.
         * However, the Future still reference each "model". 
         */

        Runtime.getRuntime().gc();
        System.out.println("Allocated memory (only futures): " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));

        Vector<Double> results = new Vector<Double>(resultsFutures.size());
        for(int i=0; i<resultsFutures.size(); i++) {
            results.add(i, resultsFutures.get(i).get());
        }       

        return results;
    }

    private Vector<Future<Double>> getResultsConcurrently() {

        /*
         * At this point we allocate some model, which represents
         * something our workers work on.
         */
        Vector<ArrayList<Double>> models = new Vector<ArrayList<Double>>(numberOfModels);
        for(int i=0; i<numberOfModels; i++) {
            models.add(i, new ArrayList<Double>());
        }

        /*
         * Work on the models concurrently
         */
        Vector<Future<Double>> results = calculateResults(models);


        /*
         * Return the futures.
         * Note: We expect that no more reference is held to a model
         * once we are running out scope of this function AND the respective worker
         * has completed.
         */
        return results;
    }

    private Vector<Future<Double>> calculateResults(Vector<ArrayList<Double>> models) {
        Vector<Future<Double>> results = new Vector<Future<Double>>(models.size());
        for(int i=0; i<models.size(); i++) {
            final ArrayList<Double> model       = models.get(i);
            final int               modelNumber = i;

            Callable<Double> worker = new  Callable<Double>() {
                public Double call() throws InterruptedException {

                    /*
                     * The models will perform some thread safe lazy init,
                     * which we simulate here, via the following line
                     */
                    for(int j=0; j<numberOfDoubles; j++) model.add(Math.random());

                    /*
                     * Now the worker starts working on the model
                     */
                    double sum = 0.0;
                    for(Double value : model) sum += value.doubleValue();

                    Thread.sleep(1000);

                    Runtime.getRuntime().gc();
                    System.out.println("Model " + modelNumber + " completed. Allocated memory: " + (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()));

                    return sum;
                }
            };

            // The following line will add the future result of the calculation to the vector results               
            results.add(i, executor.submit(worker));
        }

        return results;
    }
}

Here is a screenshot of a debugger/profiler (this was done in another example). The FutureTask has completed (as is obvious from the result). However, the FutureTask holds a reference to the Callable. In this case the Callable holds a reference to the outer final variable arguments which contains some "big" object.

enter image description here

(This example is more real life. Here Obba Server works on a spreadsheet using concurrent creation and processing of data - taken from a project of mine).

UPDATE:

Given the answers of allprog and sbat I like to add a few comments:

  • I accepted the answer of allprog, because it is an answer to the original question how to free the resources in the future. What I don't like is the dependence to an external lib in this solution, but in this case it is a good hint.

  • That said, my preferred solution is that of sbat and in my own answer below: to avoid referencing "bigger" objects in the callable after call() has completed. Actually, my preferred solution is to avoid an anonymous class implementing Callable. Instead I define an inner class implementing Callable, having a constructor, an receiving all references to other object via its constructor, freeing them at the end of the call() implementation.

Christian Fries
  • 16,175
  • 10
  • 56
  • 67
  • One improvement that could be made is to call `model.clear();` before `return sum;` in your Future's call method. – sgbj Aug 17 '13 at 20:30
  • This would be a solution in the "toy example" below. However, in my application the models are provided from outside and in general could be used by others threads too. For that reason they are caching some data. Using a model.clear() would be equivalent to using a local variable for the generated data, which is unfortunately not the setup I have. – Christian Fries Aug 17 '13 at 20:37
  • 1
    One little OFF comment: try not to use Vector. It's an outdated implementation of List. Use ArrayList instead. – allprog Aug 17 '13 at 21:28
  • @John Vint: It's that ArrayList. Note that the code is just a toy example. In my real application it's some other object (if that is the confusion). That ArrayList is blown up to some 1 MB. It is retained by the Future... – Christian Fries Aug 17 '13 at 21:57
  • Did you find the answer for your question? – allprog Aug 19 '13 at 21:58

3 Answers3

3

If you're concerned about the future task holding onto references to your models, you might try replacing

final ArrayList<Double> model = models.get(i);

with

final ArrayList<ArrayList<Double>> modelList = 
            new ArrayList<ArrayList<Double>>();
modelList.add(models.get(i));

Then at the beginning of your task's call method, do

ArrayList<Double> model = modelList.get(0);

and at the end of it write

model = null;
modelList.clear();

At least that made an improvement in your "toy example" without forcibly clearing out the user supplied model. Whether or not this can be applied to your actual work I cannot say.

sgbj
  • 2,264
  • 17
  • 14
2

The Future itself is a FutureTask if you use the default ExecutionService implementations. The FutureTask will keep a reference to the Runnable or Callable that you submitted, consequently, any resources that these allocate will be kept until the Future is not freed up.

The best approach to these kind of issues is to transform the Future into another Future that keeps only the result and return this to the caller. This way you won't need a completion service or any additional hacking.

// Decorate the executor service with listening
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

// Submit the task
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
    public Integer call() throws Exception {
        return new Integer(1234);
    }
});

// Transform the future to get rid of the memory consumption
// The transformation must happen asynchronously, thus the 
// Executor parameter is vital
Futures.transform(future, new AsyncFunction<Integer, Integer>() {
    public ListenableFuture<Integer> apply(Integer input) {
        return Futures.immediateFuture(input);
    }
}, service);

This effectively implements the scheme you described but lifts the hacking off your shoulders, and will allow for much higher flexibility later. This brings in Guava as a dependency but I think it worth the cost. I always use this scheme.

allprog
  • 16,540
  • 9
  • 56
  • 97
1

A solution that came to my mind is

  • Consume the futures in a completion service (via take(), which should free these futures, and then do this on an additional thread, returning new futures. The outer code only references the futures from the completion service.

In addition to the other answers: Another possibility is to avoid holding the reference to the model in the callable, like in sbat answers. The following does the same in a different way (inspired by answers to Accessing constructor of an anonymous class ):

  • Pass the object "model" via a setter to a field of the anonymous class, upon completion set that field to zero.

        for(int i=0; i<models.size(); i++) {
    // REMOVED: final ArrayList<Double> model       = models.get(i);
            final int               modelNumber = i;
    
            Callable<Double> worker = new  Callable<Double>() {
    /*ADDED*/   private ArrayList<Double> model;
    /*ADDED*/   public Callable<Double> setModel(ArrayList<Double> model) { this.model = model; return this; };
    
                public Double call() throws InterruptedException {
    
                    /* ... */
    
                    /*
                     * Now the worker starts working on the model
                     */
                    double sum = 0.0;
    
                    /* ... */
    
    /*ADDED*/       model = null;
                    return sum;
                }
    /*ADDED*/   }.setModel(models.get(i));
    
    
            // The following line will add the future result of the calculation to the vector results
            results.add(i, executor.submit(worker));
        }
    
Community
  • 1
  • 1
Christian Fries
  • 16,175
  • 10
  • 56
  • 67