10

My current code uses series of asynchronous processes that culminate in results. I need to wrap each of these in such a way that each is accessed by a synchronous method with the result as a return value. I want to use executor services to do this, so as to allow many of these to happen at the same time. I have the feeling that Future might be pertinent to my implementation, but I can't figure out a good way to make this happen.

What I have now:

public class DoAJob {
  ResultObject result;

  public void stepOne() {
    // Passes self in for a callback
    otherComponent.doStepOne(this);
  }

  // Called back by otherComponent once it has completed doStepOne
  public void stepTwo(IntermediateData d) {
    otherComponent.doStepTwo(this, d);
  }

  // Called back by otherComponent once it has completed doStepTwo
  public void stepThree(ResultObject resultFromOtherComponent) {
    result = resultFromOtherComponent;
  //Done with process
  }
}

This has worked pretty well internally, but now I need to map my process into a synchronous method with a return value like:

public ResultObject getResult(){
  // ??? What goes here ???
}

Does anyone have a good idea about how to implement this elegantly?

irondwill
  • 101
  • 1
  • 4
  • 1
    One thing is missing in this spec: how do you plan to combine the ResultObject of the different tasks into a single one at the end? – Eyal Schneider Apr 04 '13 at 15:52
  • I'm not sure I understand the question. Normally the process is kicked off by new DoAJob.stepOne(); the stepTwo() method is initiated by a callback from otherComponent. At the end of the chain of callbacks, the ResultObject is correctly populated. Any intermediate data bits are integrated into the final result. – irondwill Apr 04 '13 at 16:24

5 Answers5

9

If you want to turn an asynchronous operation (which executes a callback when finished), into a synchronous/blocking one, you can use a blocking queue. You can wrap this up in a Future object if you wish.

  1. Define a blocking queue which can hold just one element:

    BlockingQueue<Result> blockingQueue = new ArrayBlockingQueue<Result>(1);

  2. Start your asynchronous process (will run in the background), and write the callback such that when it's done, it adds its result to the blocking queue.

  3. In your foreground/application thread, have it take() from the queue, which blocks until an element becomes available:

    Result result = blockingQueue.take();

I wrote something similar before (foreground thread needs to block for an asynchronous response from a remote machine) using something like a Future, you can find example code here.

npgall
  • 2,979
  • 1
  • 24
  • 24
  • A `SynchronousQueue` is probably better for this purpose – ZhongYu Apr 04 '13 at 16:33
  • 2
    I've been there before I'm afraid (with the code I linked to actually). A SynchronousQueue may cause the background thread to block or receive an exception, if there was a race condition such that the background thread completed before the foreground thread called take(). If this was wrapped in a Future as the OP mentioned, it might be a common occurrence if the application did some other work before calling Future.get(). – npgall Apr 04 '13 at 17:32
1

I've done something similar with the Guava library; these links might point you in the right direction:

Is it possible to chain async calls using Guava?

https://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained

Community
  • 1
  • 1
will vuong
  • 156
  • 3
0

I recommend using invokeAll(..). It will submit a set of tasks to the executor, and block until the last one completes (successfully/with exception). It then returns a list of completed Future objects, so you can loop on them and merge the results into a single ResultObject.

In you wish to run only a single task in a synchronous manner, you can use the following:

executor.invokeAll(Collections.singleton(task)); 

--edit--

Now I think I understand better your needs. I assume that you need a way to submit independent sequences of tasks. Please take a look at the code I posted in this answer.

Community
  • 1
  • 1
Eyal Schneider
  • 22,166
  • 5
  • 47
  • 78
  • I didn't illustrate this well in my example, but steps are dependent upon the previous step being completed. For example, stepTwo() is executed upon remote completion of stepOne(). stepTwo() is essentially the completion callback for the component called in stepOne() – irondwill Apr 04 '13 at 16:32
0

If you like to get your hands dirty, you can do this

ResultObject result;

public void stepOne() 

    otherComponent.doStepOne(this);

    synchronized(this)
        while(result==null) this.wait();
    return result;

public void stepThree(ResultObject resultFromOtherComponent) 

    result = resultFromOtherComponent;

    synchronized(this)
        this.notify();

Or you can use higher level concurrency tools, like BlockingQueue, Semaphore, CountdownLatch, Phaser, etc etc.

Note that DoAJob is not thread safe - trouble ensured if two threads call stepOne at the same time.

ZhongYu
  • 19,446
  • 5
  • 33
  • 61
  • Yeah I may end up having to do something like that since it appears to be a fairly specific use case & architecture. I was hoping to avoid having to implement wait() loops or junking up my code with too many synchronized() calls. – irondwill Apr 04 '13 at 16:38
0

Bumerang is my async only http request library which is constructed for Android http requests using Java -> https://github.com/hanilozmen/Bumerang . I needed to make synchronous calls without touching my library. Here is my complete code. npgall's answer inspired me, thanks! Similar approach would be applied to all kinds of async libraries.

public class TestActivity extends Activity {

MyAPI api = (MyAPI) Bumerang.get().initAPI(MyAPI.class);
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<Object>(1);

static int indexForTesting;

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_test);
    Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
            for(int i = 0; i < 10; i++) {
                getItems();
                try {
                    Object response = blockingQueue.take(); // waits for the response
                    Log.i("TAG", "index " + indexForTesting + " finished. Response " + response.toString());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    });
    t.start();
}

void getItems() {
    Log.i("TAG", "index " + ++indexForTesting + " started");
    api.getItems(new ResponseListener<Response<List<ResponseModel>>>() {
        @Override
        public void onSuccess(Response<List<ResponseModel>> response) {
            List<ResponseModel> respModel = response.getResponse();
            try {
                blockingQueue.put(response);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onError(Response<List<ResponseModel>> response) {
            Log.i("onError", response.toString());
            try {
                blockingQueue.put(response);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

}

hanilozmen
  • 460
  • 4
  • 8