51

similar questions:

I have an object with a method I would like to expose to library clients (especially scripting clients) as something like:

interface MyNiceInterface
{
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg);
    public Future<Baz> doSomething(Foo fooArg, Bar barArg);
    // doSomethingAndBlock is the straightforward way;
    // doSomething has more control but deals with
    // a Future and that might be too much hassle for
    // scripting clients
}

but the primitive "stuff" I have available is a set of event-driven classes:

interface BazComputationSink
{
    public void onBazResult(Baz result);
}

class ImplementingThing
{
    public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink);
}

where ImplementingThing takes inputs, does some arcane stuff like enqueueing things on a task queue, and then later when a result occurs, sink.onBazResult() gets called on a thread that may or may not be the same thread as ImplementingThing.doSomethingAsync() was called.

Is there a way I can use the event-driven functions I have, along with concurrency primitives, to implement MyNiceInterface so scripting clients can happily wait on a blocking thread?

edit: can I use FutureTask for this?

Community
  • 1
  • 1
Jason S
  • 184,598
  • 164
  • 608
  • 970

7 Answers7

48

Using your own Future implemenation:

public class BazComputationFuture implements Future<Baz>, BazComputationSink {

    private volatile Baz result = null;
    private volatile boolean cancelled = false;
    private final CountDownLatch countDownLatch;

    public BazComputationFuture() {
        countDownLatch = new CountDownLatch(1);
    }

    @Override
    public boolean cancel(final boolean mayInterruptIfRunning) {
        if (isDone()) {
            return false;
        } else {
            countDownLatch.countDown();
            cancelled = true;
            return !isDone();
        }
    }

    @Override
    public Baz get() throws InterruptedException, ExecutionException {
        countDownLatch.await();
        return result;
    }

    @Override
    public Baz get(final long timeout, final TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        countDownLatch.await(timeout, unit);
        return result;
    }

    @Override
    public boolean isCancelled() {
        return cancelled;
    }

    @Override
    public boolean isDone() {
        return countDownLatch.getCount() == 0;
    }

    public void onBazResult(final Baz result) {
        this.result = result;
        countDownLatch.countDown();
    }

}

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    BazComputationFuture future = new BazComputationFuture();
    doSomethingAsync(fooArg, barArg, future);
    return future;
}

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    return doSomething(fooArg, barArg).get();
}

The solution creates a CountDownLatch internally which is cleared once the callback is received. If the user calls get, the CountDownLatch is used to block the calling thread until the computation completes and call the onBazResult callback. The CountDownLatch will assure that if the callback occurs before get() is called the get() method will return immediately with a result.

Michael Barker
  • 14,153
  • 4
  • 48
  • 55
  • +1 because I think I understand it.... but could you explain the concurrency aspects? (the use of countdown latch) – Jason S Feb 01 '10 at 22:58
  • ok thanks. Hmmmm... (confused) I thought I read that FutureTask itself doesn't actually create any threading, it's just something that you can use to implement a Future as well as submit it on an Executor. – Jason S Feb 02 '10 at 13:36
  • Out of curiosity. Why not declare private boolean cancelled as volatile? – John Vint Feb 02 '10 at 14:06
  • 1
    I don't think that it is needed as it will only be set/read from the same thread so changes to the value will not need to be see across threads in the scenario described. If allow a separate thread from the caller to see the future and cancel requests then it should be volatile. – Michael Barker Feb 02 '10 at 16:16
  • Ok fair enough, wasnt sure if you had planned on it being visible to other threads. – John Vint Feb 02 '10 at 18:15
  • @Jason, I may have misread the FutureTask source, should be possible using a Callable. – Michael Barker Feb 02 '10 at 18:43
  • I may have misread it too, I just tried to implement something + I don't think it will work w/ asynchronous callbacks. thanks for your post! (p.s. could you use CountDownLatch.getCount() and get rid of the "done" flag?) – Jason S Feb 02 '10 at 19:01
  • @JasonS maybe this could help ? [Async2sync](https://github.com/mejmo/async2sync) – Mejmo Oct 14 '15 at 17:32
  • Thanks, but this was a project I worked on 5.5 years ago so I have no idea whether that would have helped. – Jason S Oct 14 '15 at 17:39
  • https://gist.github.com/prasannaboppe/0af563860b90353032e9f744c900b3f7 – Prasanna Feb 21 '20 at 06:04
17

Well, there is the simple solution of doing something like:

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
  final AtomicReference<Baz> notifier = new AtomicReference();
  doSomethingAsync(fooArg, barArg, new BazComputationSink() {
    public void onBazResult(Baz result) {
      synchronized (notifier) {
        notifier.set(result);
        notifier.notify();
      }
    }
  });
  synchronized (notifier) {
    while (notifier.get() == null)
      notifier.wait();
  }
  return notifier.get();
}

Of course, this assumes that your Baz result will never be null…

Paul Wagland
  • 27,756
  • 10
  • 52
  • 74
  • Inside callback (BazComputationSink() {} ), it'd complain - initialize notifier. And if initialized to null, I get NullPointerException thrown for the first hit, cause my results aren't ready yet back in async task. – Khulja Sim Sim Jun 19 '14 at 01:46
  • 2
    This is not recommended: if your callback returns immediately , notify will be called before wait and you get a deadlock (this may happen if you have a cache logic) – Patrick Sep 14 '15 at 14:05
  • @for3st that is why you always check the condition before you go into the wait, which is why that for loop is there. What you are concerned about cannot happen with the core as presented. – Paul Wagland Sep 14 '15 at 15:31
  • Looks good, but what if result can be null? Also it is not clear why there is synchronized blocks and while loop. – vyndor Aug 17 '18 at 13:11
14

The google guava library has an easy to use SettableFuture that makes this problem very simple (around 10 lines of code).

public class ImplementingThing {

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) {
    try {
        return doSomething(fooArg, barArg).get();
    } catch (Exception e) {
        throw new RuntimeException("Oh dear");
    }
};

public Future<Baz> doSomething(Foo fooArg, Bar barArg) {
    final SettableFuture<Baz> future = new SettableFuture<Baz>();
    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
        @Override
        public void onBazResult(Baz result) {
            future.set(result);
        }
    });
    return future;
};

// Everything below here is just mock stuff to make the example work,
// so you can copy it into your IDE and see it run.

public static class Baz {}
public static class Foo {}
public static class Bar {}

public static interface BazComputationSink {
    public void onBazResult(Baz result);
}

public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Baz baz = new Baz();
            sink.onBazResult(baz);
        }
    }).start();
};

public static void main(String[] args) {
    System.err.println("Starting Main");
    System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null));
    System.err.println("Ending Main");
}
kybernetikos
  • 8,281
  • 1
  • 46
  • 54
7

This is dead simple with RxJava 2.x:

try {
    Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
            doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
            .toFuture().get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

Or without Lambda notation:

Baz baz = Single.create(new SingleOnSubscribe<Baz>() {
                @Override
                public void subscribe(SingleEmitter<Baz> emitter) {
                    doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                        @Override
                        public void onBazResult(Baz result) {
                            emitter.onSuccess(result);
                        }
                    });
                }
            }).toFuture().get();

Even simpler:

Baz baz = Single.create((SingleEmitter<Baz> emitter) ->
                doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result)))
                .blockingGet();

Kotlin Version:

val baz = Single.create<Baz> { emitter -> 
    doSomethingAsync(fooArg, barArg) { result -> emitter.onSuccess(result) } 
}.blockingGet()
Emanuel Moecklin
  • 28,488
  • 11
  • 69
  • 85
4

A very simple example, just to understand CountDownLatch without any extra code.

A java.util.concurrent.CountDownLatch is a concurrency construct that allows one or more threads to wait for a given set of operations to complete.

A CountDownLatch is initialized with a given count. This count is decremented by calls to the countDown() method. Threads waiting for this count to reach zero can call one of the await() methods. Calling await() blocks the thread until the count reaches zero.

Below is a simple example. After the Decrementer has called countDown() 3 times on the CountDownLatch, the waiting Waiter is released from the await() call.

You can also mention some TimeOut to await.

CountDownLatch latch = new CountDownLatch(3);

Waiter      waiter      = new Waiter(latch);
Decrementer decrementer = new Decrementer(latch);

new Thread(waiter)     .start();
new Thread(decrementer).start();

Thread.sleep(4000);
public class Waiter implements Runnable{

    CountDownLatch latch = null;

    public Waiter(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Waiter Released");
    }
}

//--------------

public class Decrementer implements Runnable {

    CountDownLatch latch = null;

    public Decrementer(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {

        try {
            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();

            Thread.sleep(1000);
            this.latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Reference

If you don't want to use a CountDownLatch or your requirement is something same as Facebook like and unlike functionality. Means if one method is being called then don't call the other method.

In that case you can declare a

private volatile Boolean isInprocessOfLikeOrUnLike = false;

and then you can check in the beginning of your method call that if it is false then call method otherwise return.. depends upon your implementation.

AZ_
  • 21,688
  • 25
  • 143
  • 191
4

Here's a more generic solution based on Paul Wagland's answer:

public abstract class AsyncRunnable<T> {
    protected abstract void run(AtomicReference<T> notifier);

    protected final void finish(AtomicReference<T> notifier, T result) {
        synchronized (notifier) {
            notifier.set(result);
            notifier.notify();
        }
    }

    public static <T> T wait(AsyncRunnable<T> runnable) {
        final AtomicReference<T> notifier = new AtomicReference<>();

        // run the asynchronous code
        runnable.run(notifier);

        // wait for the asynchronous code to finish
        synchronized (notifier) {
            while (notifier.get() == null) {
                try {
                    notifier.wait();
                } catch (InterruptedException ignore) {}
            }
        }

        // return the result of the asynchronous code
        return notifier.get();
    }
}

Here's an example how to use it::

    String result = AsyncRunnable.wait(new AsyncRunnable<String>() {
        @Override
        public void run(final AtomicReference<String> notifier) {
            // here goes your async code, e.g.:
            new Thread(new Runnable() {
                @Override
                public void run() {
                    finish(notifier, "This was a asynchronous call!");
                }
            }).start();
        }
    });

A more verbose version of the code can be found here: http://pastebin.com/hKHJUBqE

EDIT: The example related to the question would be:

public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) {
    return AsyncRunnable.wait(new AsyncRunnable<Baz>() {
        @Override
        protected void run(final AtomicReference<Baz> notifier) {
            doSomethingAsync(fooArg, barArg, new BazComputationSink() {
                public void onBazResult(Baz result) {
                    synchronized (notifier) {
                        notifier.set(result);
                        notifier.notify();
                    }
                }
            });
        }
    });
}
Emanuel Moecklin
  • 28,488
  • 11
  • 69
  • 85
  • I comprehend your answer but I can't see how to map it to my question of over six years ago. – Jason S Mar 15 '16 at 03:12
  • I updated the answer so you can map it to the question – Emanuel Moecklin Mar 15 '16 at 03:31
  • BTW if you have to do this in may places I'd consider using this: https://github.com/ReactiveX/RxJava. ReactiveX is a perfect match for your problem although it has a steep learning curve (at least for me). This is a great introduction: https://www.youtube.com/watch?v=_t06LRX0DV0. – Emanuel Moecklin Mar 15 '16 at 03:41
  • ok thanks! my problem is old and gone but you never know, there have been times that I've come up with a problem and I search on SO and it pulls up an old question of mine. – Jason S Mar 15 '16 at 14:09
  • I simply wanted to share my solution. You never know who might find it useful. – Emanuel Moecklin Mar 15 '16 at 14:43
  • I have more interesting case, in your last code example, what if BazComputationSink must do work on the same thread as doSomethingAndBlock invoked? In general, there is same thread where method invokes and we must block farther execution because we must wait for the callback from different place that is also happening in the same thread. It could sounds strange an wrong design but there is existing architecture with customers and it is impossible to change API for them ... – Yuriy Chernyshov Aug 24 '16 at 20:05
4

The simplest way (which works for me) is to

  1. Create a blocking queue
  2. Call the asynchronous method - use a handler that offers the result to that blocking queue.
  3. Poll the queue (that's where you block) for the result.

    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) throws InterruptedException {
        final BlockingQueue<Baz> blocker = new LinkedBlockingQueue();
        doSomethingAsync(fooArg, barArg, blocker::offer);
        // Now block until response or timeout
        return blocker.poll(30, TimeUnit.SECONDS);
    }
    
oguz ismail
  • 1
  • 16
  • 47
  • 69
Joel Shemtov
  • 3,008
  • 2
  • 22
  • 22