269

I would like to know the difference between CompletableFuture,Future and Observable RxJava.

What I know is all are asynchronous but

Future.get() blocks the thread

CompletableFuture gives the callback methods

RxJava Observable --- similar to CompletableFuture with other benefits(not sure)

For example: if client needs to make multiple service calls and when we use Futures (Java) Future.get() will be executed sequentially...would like to know how its better in RxJava..

And the documentation http://reactivex.io/intro.html says

It is difficult to use Futures to optimally compose conditional asynchronous execution flows (or impossible, since latencies of each request vary at runtime). This can be done, of course, but it quickly becomes complicated (and thus error-prone) or it prematurely blocks on Future.get(), which eliminates the benefit of asynchronous execution.

Really interested to know how RxJava solves this problem. I found it difficult to understand from the documentation.

Hash
  • 4,647
  • 5
  • 21
  • 39
shiv455
  • 7,384
  • 19
  • 54
  • 93
  • Have you read the documentation for each? I'm totally unfamiliar with RxJava, but the documentation appears extremely thorough at a glance. It doesn't seem particularly comparable to the two futures. – FThompson Feb 11 '16 at 02:42
  • i have gone through but not able to get how different its from Java futures...correct me if im wrong – shiv455 Feb 11 '16 at 03:00
  • How are observables similar to futures? – FThompson Feb 11 '16 at 03:08
  • i guess both are used for asynchronous operations – shiv455 Feb 11 '16 at 03:10
  • Beyond that, I don't see anything similar about them. The documentations explain totally different concepts as far as I can tell. – FThompson Feb 11 '16 at 03:11
  • 2
    would like to know where it is different like is it different in thread management?? EX:Future.get() blocks the thread....how it will be handled in Observable??? – shiv455 Feb 11 '16 at 03:13
  • That's a far more concise and better question. Is it answered by the [async documentation page](https://github.com/ReactiveX/RxJava/wiki/Async-Operators)? – FThompson Feb 11 '16 at 03:19
  • 2
    atleast its bit confusing for me...a high level difference would be really helpful!! – shiv455 Feb 11 '16 at 03:21
  • When you said ~"**Future.get() blocks the thread**", you can just use `subscribeOn()` a different thread. – IgorGanapolsky Oct 05 '17 at 15:18

5 Answers5

403

Futures

Futures were introduced in Java 5 (2004). They're basically placeholders for a result of an operation that hasn't finished yet. Once the operation finishes, the Future will contain that result. For example, an operation can be a Runnable or Callable instance that is submitted to an ExecutorService. The submitter of the operation can use the Future object to check whether the operation isDone(), or wait for it to finish using the blocking get() method.

Example:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures were introduced in Java 8 (2014). They are in fact an evolution of regular Futures, inspired by Google's Listenable Futures, part of the Guava library. They are Futures that also allow you to string tasks together in a chain. You can use them to tell some worker thread to "go do some task X, and when you're done, go do this other thing using the result of X". Using CompletableFutures, you can do something with the result of the operation without actually blocking a thread to wait for the result. Here's a simple example:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava is whole library for reactive programming created at Netflix. At a glance, it will appear to be similar to Java 8's streams. It is, except it's much more powerful.

Similarly to Futures, RxJava can be used to string together a bunch of synchronous or asynchronous actions to create a processing pipeline. Unlike Futures, which are single-use, RxJava works on streams of zero or more items. Including never-ending streams with an infinite number of items. It's also much more flexible and powerful thanks to an unbelievably rich set of operators.

Unlike Java 8's streams, RxJava also has a backpressure mechanism, which allows it to handle cases in which different parts of your processing pipeline operate in different threads, at different rates.

The downside of RxJava is that despite the solid documentation, it is a challenging library to learn due to the paradigm shift involved. Rx code can also be a nightmare to debug, especially if multiple threads are involved, and even worse - if backpressure is needed.

If you want to get into it, there's a whole page of various tutorials on the official website, plus the official documentation and Javadoc. You can also take a look at some of the videos such as this one which gives a brief intro into Rx and also talks about the differences between Rx and Futures.

Bonus: Java 9 Reactive Streams

Java 9's Reactive Streams aka Flow API are a set of Interfaces implemented by various reactive streams libraries such as RxJava 2, Akka Streams, and Vertx. They allow these reactive libraries to interconnect, while preserving the all important back-pressure.

Malt
  • 28,965
  • 9
  • 65
  • 105
  • 2
    Would be nice to give example code of how Rx does this – Ascendant Nov 23 '16 at 21:01
  • So using Reactive Streams, we can mix RxJava, Akka, and Vertx in one application? – IgorGanapolsky Oct 05 '17 at 16:37
  • 1
    @IgorGanapolsky Yes. – Malt Oct 05 '17 at 16:45
  • 1
    In CompletableFutures we use callbacks methods, these callback methods will also block if output of one method is input of other callback. As future block with Future.get() call. Why it is said that Future.get() is blocking call while CompletableFutures do not block. Please explain – Deepak Jun 13 '18 at 04:34
  • @Deepak Your threads aren't waiting for the future to complete so there's no blocking while the future is executing. – Malt Jun 13 '18 at 07:49
  • @Malt CompletableFutures are running inside the same thread but future will run in multiple threads, like 3 threads and if two threads have finished their work the third thread has not yet finished his work it will block (future.get()). Please rectify me if my understanding is wrong – Deepak Jun 14 '18 at 05:24
  • @Deepak Futures don't "run in threads". They are simply handles - placeholders for results. Regular futures are simple placeholders, they can either contain a result or not. CompletableFutures can also tell the thread that placed the result in the CompletableFuture "go do this other thing too". – Malt Jun 14 '18 at 12:32
  • `System.out.println(f.isDone()); // False` In the above code for `CompletableFutures`, if you place a Thread.sleep(1000) before this line OR you do f.get() then f.isDone() will return True. It just hasn't had time to complete the execution. – Vink Nov 05 '18 at 20:46
  • Can you discuss `CompletableFuture`? this answer doesn't mention it yet – Alexander Mills Feb 19 '19 at 07:05
  • @AlexanderMills The answer does mention CompletableFutures - after regular futures and before RXxJava – Malt Feb 19 '19 at 07:37
  • Ah yes thanks I see it now..if you have a moment I have a related question: https://stackoverflow.com/q/54757857/1223975 – Alexander Mills Feb 19 '19 at 08:06
  • @Malt Great answer. Could you clarify what you mean by "Unlike Futures, which are single-use"? – Federico Sep 22 '19 at 11:12
  • 2
    @Federico Sure. Each `Future` is a placeholder for a *single* result that may or may not have completed yet. If you perform the same operation again, you'll get a new `Future` instance. RxJava deals with *streams* of results which can come at any time. A series of operations can therefore return a single RxJava observable that will pump out a bunch of results. It's a bit like the difference between a single postal envelope and a pneumatic tube that keeps pumping out mail. – Malt Sep 22 '19 at 11:25
  • `System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2` is a blocking call right? – srk Dec 08 '21 at 14:38
  • 1
    @srk yes. It *blocks* until the calculation is complete. – Malt Dec 08 '21 at 15:09
26

I have been working with Rx Java since 0.9, now at 1.3.2 and soon migrating to 2.x I use this in a private project where I already work on for 8 years.

I wouldn't program without this library at all anymore. In the beginning I was skeptic but it is a complete other state of mind you need to create. Quiete difficult in the beginning. I sometimes was looking at the marbles for hours.. lol

It is just a matter of practice and really getting to know the flow (aka contract of observables and observer), once you get there, you'll hate to do it otherwise.

For me there is not really a downside on that library.

Use case: I have a monitor view that contains 9 gauges (cpu, mem, network, etc...). When starting up the view, the view subscribes itselfs to a system monitor class that returns an observable (interval) that contains all the data for the 9 meters. It will push each second a new result to the view (so not polling !!!). That observable uses a flatmap to simultaneously (async!) fetch data from 9 different sources and zips the result into a new model your view will get on the onNext().

How the hell you gonna do that with futures, completables etc ... Good luck ! :)

Rx Java solves many issues in programming for me and makes in a way a lot easier...

Advantages:

  • Statelss !!! (important thing to mention, most important maybe)
  • Thread management out of the box
  • Build sequences that have their own lifecycle
  • Everything are observables so chaining is easy
  • Less code to write
  • Single jar on classpath (very lightweight)
  • Highly concurrent
  • No callback hell anymore
  • Subscriber based (tight contract between consumer and producer)
  • Backpressure strategies (circuit breaker a like)
  • Splendid error handling and recovering
  • Very nice documentation (marbles <3)
  • Complete control
  • Many more ...

Disadvantages: - Hard to test

Hash
  • 4,647
  • 5
  • 21
  • 39
Kristoffer
  • 285
  • 3
  • 3
7

Java's Future is a placeholder to hold something that will be completed in the future with a blocking API. You'll have to use its' isDone() method to poll it periodically to check if that task is finished. Certainly you can implement your own asynchronous code to manage the polling logic. However, it incurs more boilerplate code and debug overhead.

Java's CompletableFuture is innovated by Scala's Future. It carries an internal callback method. Once it is finished, the callback method will be triggered and tell the thread that the downstream operation should be executed. That's why it has thenApply method to do further operation on the object wrapped in the CompletableFuture.

RxJava's Observable is an enhanced version of CompletableFuture. It allows you to handle the backpressure. In the thenApply method (and even with its brothers thenApplyAsync) we mentioned above, this situation might happen: the downstream method wants to call an external service that might become unavailable sometimes. In this case, the CompleteableFuture will fail completely and you will have to handle the error by yourself. However, Observable allows you to handle the backpressure and continue the execution once the external service to become available.

In addition, there is a similar interface of Observable: Flowable. They are designed for different purposes. Usually Flowable is dedicated to handle the cold and non-timed operations, while Observable is dedicated to handle the executions requiring instant responses. See the official documents here: https://github.com/ReactiveX/RxJava#backpressure

RESSY VAN
  • 71
  • 1
  • 1
4

All three interfaces serve to transfer values from producer to consumer. Consumers can be of 2 kinds:

  • synchronous: consumer makes blocking call which returns when the value is ready
  • asynchronous: when the value is ready, a callback method of the consumer is called

Also, communication interfaces differ in other ways:

  • able to transfer single value of multiple values
  • if multiple values, backpressure can be supported or not

As a result:

  • Future transferes single value using synchronous interface

  • CompletableFuture transferes single value using both synchronous and asynchronous interfaces

  • Rx transferes multiple values using asynchronous interface with backpressure

Also, all these communication facilities support transferring exceptions. This is not always the case. For example, BlockingQueue does not.

Alexei Kaigorodov
  • 13,189
  • 1
  • 21
  • 38
3

The main advantage of CompletableFuture over normal Future is that CompletableFuture takes advantage of the extremely powerful stream API and gives you callback handlers to chain your tasks, which is absolutely absent if you use normal Future. That along with providing asynchronous architecture, CompletableFuture is the way to go for handling computation heavy map-reduce tasks, without worrying much about application performance.

asmitB
  • 19
  • 5