160

Are Java 8 streams similar to RxJava observables?

Java 8 stream definition:

Classes in the new java.util.stream package provide a Stream API to support functional-style operations on streams of elements.

Tagir Valeev
  • 97,161
  • 19
  • 222
  • 334
rahul.ramanujam
  • 5,608
  • 7
  • 34
  • 56
  • 8
    FYI there is proposals to introduce more RxJava like classes in JDK 9. http://jsr166-concurrency.10961.n7.nabble.com/jdk9-Candidate-classes-Flow-and-SubmissionPublisher-td11967.html – John Vint May 13 '15 at 15:06
  • @JohnVint What's the status of this proposal. Will it actually take flight? – IgorGanapolsky Mar 08 '16 at 13:45
  • 2
    @IgorGanapolsky Oh yes, it definitely looks like it will make it into jdk9. http://cr.openjdk.java.net/~martin/webrevs/openjdk9/jsr166-jdk9-integration/. There is even a port for RxJava to Flow https://github.com/akarnokd/RxJavaUtilConcurrentFlow. – John Vint Mar 08 '16 at 15:26
  • I know this is a really old question, but I recently attended this great talk by Venkat Subramaniam which has a insightful take on the subject and is updated to Java9: https://www.youtube.com/watch?v=kfSSKM9y_0E. Could be interesting for people delving into RxJava. – Pedro Jul 19 '18 at 12:40

7 Answers7

168

Short answer

All sequence/stream processing libs are offering very similar API for pipeline building. The differences are in API for handling multi-threading and composition of pipelines.

Long answer

RxJava is quite different from Stream. Of all JDK things, the closest to rx.Observable is perhaps java.util.stream.Collector Stream + CompletableFuture combo (which comes at a cost of dealing with extra monad layer, i. e. having to handle conversion between Stream<CompletableFuture<T>> and CompletableFuture<Stream<T>>).

There are significant differences between Observable and Stream:

  • Streams are pull-based, Observables are push-based. This may sound too abstract, but it has significant consequences that are very concrete.
  • Stream can only be used once, Observable can be subscribed to many times.
  • Stream#parallel() splits sequence into partitions, Observable#subscribeOn() and Observable#observeOn() do not; it is tricky to emulate Stream#parallel() behavior with Observable, it once had .parallel() method but this method caused so much confusion that .parallel() support was moved to separate repository: ReactiveX/RxJavaParallel: Experimental Parallel Extensions for RxJava. More details are in another answer.
  • Stream#parallel() does not allow to specify a thread pool to use, unlike most of RxJava methods accepting optional Scheduler. Since all stream instances in a JVM use the same fork-join pool, adding .parallel() can accidentally affect the behaviour in another module of your program.
  • Streams are lacking time-related operations like Observable#interval(), Observable#window() and many others; this is mostly because Streams are pull-based, and upstream has no control on when to emit next element downstream.
  • Streams offer restricted set of operations in comparison with RxJava. E.g. Streams are lacking cut-off operations (takeWhile(), takeUntil()); workaround using Stream#anyMatch() is limited: it is terminal operation, so you can't use it more than once per stream
  • As of JDK 8, there's no Stream#zip() operation, which is quite useful sometimes.
  • Streams are hard to construct by yourself, Observable can be constructed by many ways EDIT: As noted in comments, there are ways to construct Stream. However, since there's no non-terminal short-circuiting, you can't e. g. easily generate Stream of lines in file (JDK provides Files#lines() and BufferedReader#lines() out of the box though, and other similar scenarios can be managed by constructing Stream from Iterator).
  • Observable offers resource management facility (Observable#using()); you can wrap IO stream or mutex with it and be sure that the user will not forget to free the resource - it will be disposed automatically on subscription termination; Stream has onClose(Runnable) method, but you have to call it manually or via try-with-resources. E. g. you have to keep in mind that Files#lines() must be enclosed in try-with-resources block.
  • Observables are synchronized all the way through (I didn't actually check whether the same is true for Streams). This spares you from thinking whether basic operations are thread-safe (the answer is always 'yes', unless there's a bug), but the concurrency-related overhead will be there, no matter if your code need it or not.

Round-up

RxJava differs from Streams significantly. Real RxJava alternatives are other implementations of ReactiveStreams, e. g. relevant part of Akka.

Update

There's trick to use non-default fork-join pool for Stream#parallel, see Custom thread pool in Java 8 parallel stream.

Update

All of the above is based on the experience with RxJava 1.x. Now that RxJava 2.x is here, this answer may be out-of-date.

Kirill Gamazkov
  • 3,277
  • 1
  • 18
  • 22
  • 2
    Why are Streams hard to construct? According to this article, it seems easy: http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html – IgorGanapolsky Mar 08 '16 at 14:06
  • 2
    There are quite a number of classes that have 'stream' method: collections, input streams, directory files, etc. But what if you want to create a stream from a custom loop - say, iterating over database cursor? The best way I've found so far is to create an Iterator, wrap it with Spliterator, and finally invoke StreamSupport#fromSpliterator. Too much glue for a simple case IMHO. There is also Stream.iterate but it produces infinite stream. The only way to cut off sream in that case is Stream#anyMatch, but it's a terminal operation, thus you can't separate stream producer and consumer – Kirill Gamazkov Mar 08 '16 at 17:47
  • 2
    RxJava has Observable.fromCallable, Observable.create and so on. Or you can safely produce infinite Observable, then say '.takeWhile(condition)', and you're ok with shipping this sequence to the consumers – Kirill Gamazkov Mar 08 '16 at 17:49
  • 1
    Streams are not hard to construct by yourself. You can simply call `Stream.generate()` and pass your own `Supplier` implementation, just one simple method from which you provide the next item in the stream. There are loads of other methods. To easily construct a sequence `Stream` that depends on previous values you can use the `interate()` method, every `Collection` has a `stream()` method and `Stream.of()` constructs a `Stream` from a varargs or array. Finally `StreamSupport` has support for more advanced stream creation using spliterators or for streams primitive types. – jbx Nov 01 '16 at 15:09
  • 1
    "Streams are lacking cut-off operations (`takeWhile()`, `takeUntil()`);" - JDK9 has these, I believe, in [takeWhile()](https://docs.oracle.com/javase/9/docs/api/java/util/stream/Stream.html#takeWhile-java.util.function.Predicate-) and [dropWhile()](https://docs.oracle.com/javase/9/docs/api/java/util/stream/Stream.html#dropWhile-java.util.function.Predicate-) – Honinbo Shusaku Nov 30 '17 at 16:09
  • Stream's operation set is not as rich as that of RxJava, Akka Streaams, ProjectReactor, etc. takeWhile is just an example – Kirill Gamazkov Nov 30 '17 at 19:07
  • @BrianJosephSpinos well, in the context of my answer, it was a silly try to construct some analogy and express Rx via standard JDK terms. If you want a definition of monad, try https://stackoverflow.com/questions/2704652/monad-in-plain-english-for-the-oop-programmer-with-no-fp-background or https://en.wikipedia.org/wiki/Monad_(functional_programming). The tricky part here is that monad is so very abstract that it's hard to explain in simple (concrete) words – Kirill Gamazkov Sep 15 '19 at 23:45
  • RxJava treats Errors as first-class citizen with "onError", just like "onNext" for Data. This is not the case in Java Streams. – Vipul Jain Jun 14 '20 at 16:20
50

Java 8 Stream and RxJava looks pretty similar. They have look alike operators (filter, map, flatMap...) but are not built for the same usage.

You can perform asynchonus tasks using RxJava.

With Java 8 stream, you'll traverse items of your collection.

You can do pretty much the same thing in RxJava (traverse items of a collection) but, as RxJava is focussed on concurrent task, ..., it use synchronization, latch, ... So the same task using RxJava may be slower than with Java 8 stream.

RxJava can be compared to CompletableFuture, but that can be able to compute more than just one value.

dwursteisen
  • 11,435
  • 2
  • 39
  • 36
  • 12
    It's worth noting you statement about stream traversal is only true for a non-parallel stream. `parallelStream` supports similar synchronization of simple traversals/maps/filtering etc.. – John Vint May 13 '15 at 15:07
  • 2
    I don't think "So the same task using RxJava may be slower than with Java 8 stream." holds true universally, its heavily depending on the task at hand. – daschl Feb 09 '16 at 09:26
  • 1
    I am glad you said **same task using RxJava may be slower than with Java 8 stream**. This is a very important distinction that many RxJava users aren't aware of. – IgorGanapolsky Mar 08 '16 at 13:49
  • RxJava is synchronous by default. Do you have any benchmarks to support your statement that it may be slower? – Marcin Koziński May 20 '16 at 19:42
  • 6
    @marcin-koziński you can check this benchmark : https://twitter.com/akarnokd/status/752465265091309568 – dwursteisen Jul 23 '16 at 12:05
  • 1
    It'd be nice to have a list of use cases recommending J8 or Rx streams usage for each. – Stephane Dec 02 '19 at 17:43
  • "So the same task using RxJava may be slower than with Java 8 stream" is dependent on the volume of the data you are processing, say you are processing an array of 20 elements then the sequential stream performs better than the parallel one. Please correct me if i am wrong. – raj240 Sep 20 '21 at 07:21
39

There are a few technical and conceptional differences, for example, Java 8 streams are single use, pull based, synchronous sequences of values whereas RxJava Observables are re-observable, adaptively push-pull based, potentially asynchronous sequences of values. RxJava is aimed at Java 6+ and works on Android as well.

akarnokd
  • 69,132
  • 14
  • 157
  • 192
31

Java 8 Streams are pull based. You iterate over a Java 8 stream consuming each item. And it could be an endless stream.

RXJava Observable is by default push based. You subscribe to an Observable and you will get notified when the next item arrives (onNext), or when the stream is completed (onCompleted), or when an error occurred (onError). Because with Observable you receive onNext, onCompleted, onError events, you can do some powerful functions like combining different Observables to a new one (zip, merge, concat). Other stuff you could do is caching, throttling, ... And it uses more or less the same API in different languages (RxJava, RX in C#, RxJS, ...)

By default RxJava is single threaded. Unless you start using Schedulers, everything will happen on the same thread.

Yogesh Umesh Vaity
  • 41,009
  • 21
  • 145
  • 105
Bart De Neuter
  • 373
  • 3
  • 8
  • in Stream you have forEach, that is pretty much the same than onNext – paul Dec 12 '15 at 18:19
  • Actually, Streams are usually terminal. "Operations that close a stream pipeline are called terminal operations. They produce a result from a pipeline such as a List, an Integer, or even void (any non-Stream type)." ~http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html – IgorGanapolsky Mar 08 '16 at 13:55
30

The existing answers are comprehensive and correct, but a clear example for beginners is lacking. Allow me to put some concrete behind terms like "push/pull-based" and "re-observable". Note: I hate the term Observable (it's a stream for heaven's sake), so will simply refer to J8 vs RX streams.

Consider a list of integers,

digits = [1,2,3,4,5]

A J8 Stream is a utility to modify the collection. For example even digits can be extracted as,

evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())

This is basically Python's map, filter, reduce, a very nice (and long overdue) addition to Java. But what if digits weren't collected ahead of time - what if the digits were streaming in while the app was running - could we filter the even's in realtime.

Imagine a separate thread process is outputting integers at random times while the app is running (--- denotes time)

digits = 12345---6------7--8--9-10--------11--12

In RX, evencan react to each new digit and apply the filter in real-time

even = -2-4-----6---------8----10------------12

There's no need to store input and output lists. If you want an output list, no problem that's streamable too. In fact, everything is a stream.

evens_stored = even.collect()  

This is why terms like "stateless" and "functional" are more associated with RX

Adam Hughes
  • 14,601
  • 12
  • 83
  • 122
  • But 5 is not even… And that looks like J8 Stream is synchronous, while Rx Stream is asynchronous? – Franklin Yu May 05 '18 at 04:17
  • 1
    @FranklinYu thanks I fixed the 5 typo. If think less in terms of synchronous vs asyncrhouns, although it may be correct, and more in terms of imperative vs functional. In J8, you collect all your items first, then apply the filter second. In RX you define the filter function independent of the data, and then associate it with an even source (a live stream, or a java collection)... it's an entirely different programming model – Adam Hughes Jan 10 '19 at 22:22
4

RxJava is also closely related to the reactive streams initiative and considers it self as a simple implementation of the reactive streams API (e.g. compared to the Akka streams implementation). The main difference is, that the reactive streams are designed to be able to handle back pressure, but if you have a look at the reactive streams page, you will get the idea. They describe their goals pretty well and the streams are also closely related to the reactive manifesto.

The Java 8 streams are pretty much the implementation of an unbounded collection, pretty similar to the Scala Stream or the Clojure lazy seq.

Community
  • 1
  • 1
Niclas Meier
  • 176
  • 2
2

Java 8 Streams enable processing of really large collections efficiently, while leveraging multicore architectures. In contrast, RxJava is single-threaded by default (without Schedulers). So RxJava won't take advantage of multi-core machines unless you code that logic yourself.

IgorGanapolsky
  • 26,189
  • 23
  • 116
  • 147
  • 4
    Stream is single-threaded by default as well, unless you invoke .parallel(). Also, Rx gives more control over concurrency. – Kirill Gamazkov Jan 30 '17 at 15:47
  • @KirillGamazkov Kotlin Coroutines Flow (based on Java8 Streams) now supports structured concurrency: https://kotlinlang.org/docs/reference/coroutines/flow.html#flows – IgorGanapolsky Dec 02 '19 at 18:34
  • True, but I said nothing about Flow and structured concurrency. My two points were: 1) both Stream and Rx are single-threaded unless you explicitly change that; 2) Rx gives you fine-grained control on which step to perform on which thread pool, in contrast to Streams only allowing you to say "make it parallel somehow" – Kirill Gamazkov Dec 15 '19 at 21:15
  • I don't really get the point of the question "what do you need thread pool for". As you've said, "to enable processing of really large collections efficiently". Or maybe I want IO-bound part of task to run on separate thread pool. I don't think I've understood the intention behind your question. Try again? – Kirill Gamazkov Dec 30 '19 at 21:12
  • 1
    Static methods in Schedulers class allows to get predefined thread pools as well as create one from Executor. See http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/Schedulers.html#from-java.util.concurrent.Executor- – Kirill Gamazkov Jan 09 '20 at 21:45
  • @KirillGamazkov The point is to leverage multi-core. Not multi-thread pools. – IgorGanapolsky Oct 15 '20 at 16:35