26

What are all the similarities and diferences between them, It looks like Java Parallel Stream has some of the element available in RXJava, is that right?

Mohan Narayanaswamy
  • 2,149
  • 6
  • 33
  • 40

3 Answers3

41

Rx is an API for creating and processing observable sequences. The Streams API is for processing iterable sequences. Rx sequences are push-based; you are notified when an element is available. A Stream is pull-based; it "asks" for items to process. They may appear similar because they both support similar operators/transforms, but the mechanics are essentially opposites of each other.

Mike Strobel
  • 25,075
  • 57
  • 69
  • 1
    Yes, but every iterable sequence is also observable, isn't it ? Java 8 Streams seem pretty limited compared to RxJava, since RxJava can also operate on iterables. – Kr0e Apr 27 '15 at 18:33
  • 1
    The `Iterable` interface itself only supports pull-based iteration, so iterables are observable only if you use some sort of 'cold' observable adapter, e.g., one provided by RxJava. – Mike Strobel May 11 '15 at 16:24
  • Control flow is opposite: Rx calls you, but you call Stream – Kirill Gamazkov Mar 02 '16 at 21:10
2

Stream is pull based. Personally I feel it is Oracle's answer to C# IEnumerable<>, LINQ and their related extension methods.

RxJava is push based, which I am not sure whether it is .NET's reactive extensions released first or Rx project goes live first.

Conceptually they are totally different and their applications are also different.

If you are implementing a text searching program on a text file that's so large that you can't load everything and fit into memory, you would probably want to use Stream since you can easily determine if you have next lines available by keeping track of your iterator, and scan line by line.

Another application of Stream would be parallel calculations on a collection of data. Nowadays every machine has multiple cores but you won't know easily exactly how many cores your client machine are available. It would be hard to pre-configure the number of threads to operate. So we use parallel stream and let the JVM to determine that for us (supposed to be more optimal).

On the other hand, if you are implementing a program that takes an user input string and searches for available videos on the web, you would use RX since you won't even know when the program will start getting any results (or receive an error of network timeout). To make your program responsive you have to let the program "subscribe" for network updates and complete signals.

Another common application of Rx is on GUI to "detect user finished input" without requiring the user to click a button to confirm. For example you want to have a text field whenever the user stops typing you start searching without waiting a "Search button" click. In this case you use Rx to create an observable on "KeyEvent" and "throttle" (e.g. at 500ms), so that whenever he stopped typing for 500ms you receive an onNext() to "start searching".

Paul
  • 21
  • 2
1

There is also a difference in threading.

Stream#parallel splits the sequence into parts, and each part is processed in the separate thread.

Observable#subscribeOn and Observable#observeOn are both 'move' execution to another thread, but don't split the sequence.

In other words, for any particular processing stage:

  • parallel Stream may process different elements on different threads
  • Observable will use one thread for the stage

E. g. we have Observable/Stream of many elements and two processing stages:

Observable.create(...)
    .observeOn(Schedulers.io())
    .map(x -> stage1(x))
    .observeOn(Schedulers.io())
    .map(y -> stage2(y))
    .forEach(...);

Stream.generate(...)
    .parallel()
    .map(x -> stage1(x))
    .map(y -> stage2(y))
    .forEach(...);

Observable will use no more than 2 additional threads (one per stage), so no two x'es or y's are accessed by different threads. Stream, on the countrary, may span each stage across several threads.

Kirill Gamazkov
  • 3,277
  • 1
  • 18
  • 22
  • There are different ways in `rxJava` to split a sequence into parallel threads. Some of the techniques involve [using `observeOn()` within a `flatMap()`](https://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.html), [the new `.parallel()` and `sequential()` operators](https://dzone.com/articles/rxjava-idiomatic-concurrency-flatmap-vs-parallel), etc... `rxJava` is far more complex. Also, using `forEach()` doesn't look like using reactive programming as it was meant to be. `flatMap()` should be more appropriate in most cases. – Vrakfall Jul 04 '18 at 15:25
  • At the moment of writing, there were no .parallel() operator (i've used Rx 1.x). Next, what's wrong with .forEach(...) as the very last sequence stage? There must be terminal operation like .forEach(...) or .subscribe(...) for the sequence to get started – Kirill Gamazkov Jul 20 '18 at 07:24
  • Nothing wrong with the .forEach() nor the .subscribe(). I just wanted to point out that an Observable isn't force to use only one thread per stage and it was already true at the time of your writing. My first link was written before your answer and used operators to achieve parallelization. (Even in the comments, on RxJava dev showed another way which led to [another post on that blog](http://tomstechnicalblog.blogspot.com/2016/02/rxjava-maximizing-parallelization.html).) – Vrakfall Jul 23 '18 at 21:30
  • To continue my previous comment: `.subscribeOn(Schedulers.io())` is a quick example of executing a "stage" concurrently as it creates a Thread for each new item that passes through the `map`. Also, you use `observeOn` multiple times which, if I remember well, violates the "reactive contract" OR the recommendations. I think it overwrites the previous ones every time you call it and it's advised to only call it at the very end, before the subscription. All this `observeOn` part has to be taken with a grain of salt as I don't remember it well and can't find the link back to what I'm referring to – Vrakfall Jul 23 '18 at 21:42
  • Also, it seems you misunderstand its (`observeOn`) use as it only changes the Thread in which the **subscriber/observer** is executed. So, in the end only every iteration of your `forEach` is executed in a new Thread each time. I don't know how an Observable reacts when no `subscribeOn` is called but a `observeOn` is but I guess, from my understanding, that the whole "pipe" will be executed on the same thread here, with a new thread for every item. I can't be sure tho. – Vrakfall Jul 23 '18 at 21:46
  • I found back the [guidelines document I was thinking about](http://go.microsoft.com/fwlink/?LinkID=205219) when I was talking about "violating some sort of contract". I couldn't find it back so I didn't link it. Yes it's for C# but I think most of the rules can apply for most Rx implementations. What I was talking about is at the point `5.5` on page 13. – Vrakfall Jul 26 '18 at 20:22
  • 1. AFAIK, neither Schedulers.io() nor Schedulers.newThread() does not create a new thread per sequence item. It does create one per sequence (stage), but not per sequence item. If such a scheduler is shared amongst N sequence stages, it will spawn N threads, no matter how many items are in each sequence. It can be checked by .map() stage printing thread id. – Kirill Gamazkov Jul 27 '18 at 08:01
  • 2. Multiple `.opserveOn()` stages is code smell only if your stage is relatively simple. On the other hand, if you're writing higher-order chain which orchestrates simpler chains, it can be OK to specify which part of the pipe is executed on which scheduler – Kirill Gamazkov Jul 27 '18 at 08:06
  • 3. .observeOn() affects not the whole pipe but downstream stages only (until another .observeOn() call). .subscribeOn() is rarely useful, it specifies on which scheduler to call observable's subscriber consumer callback, i.e. is rather low-level and subtle thing. – Kirill Gamazkov Jul 27 '18 at 08:27
  • You're mistaking `observeOn` with `subscribeOn` and the other way around. [Look at the documentation](http://reactivex.io/documentation/operators/subscribeon.html). `"SubscribeOn specify the Scheduler on which an Observable will operate"`. I'd need a lot more than simple comments to explain you that without mistaking those operators, you can easily use `subscribeOn` to schedule any part of the chain, especially using it on an observable inside a `flatMap`, for example, with an example an outputs as a proof. – Vrakfall Jul 28 '18 at 12:25
  • On top of that, [`observeOn` **IS** the one that defines were the consumer is executed!](http://reactivex.io/documentation/operators/observeon.html) Retry you example with `subscribeOn` and see the difference already. If you still want examples, either we move into chat or you make a new question and link it to me. – Vrakfall Jul 28 '18 at 12:26