0

Since in Hot Observable , we must use backpressure strategy to prevent from crash but why we use backpressure for example in the following examples which are Cold type:

Example 1

// If we do not use backpressure, the program will crash

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        // do smth
                    }
                });

Example 2

Flowable.range(0, 1000000)
    .onBackpressureBuffer()
    .observeOn(Schedulers.computation())
    .subscribe(new FlowableSubscriber<Integer>() {
        @Override
        public void onSubscribe(Subscription s) {
        }
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "onNext: " + integer);
        }
        @Override
        public void onError(Throwable t) {
            Log.e(TAG, "onError: ", t);
        }
        @Override
        public void onComplete() {
        }
    });
Progman
  • 16,827
  • 6
  • 33
  • 48
a.rah
  • 21
  • 3
  • It's not the matter of being hot or cold, but if the source does support backpressure fully or not. `interval` prioritizes emitting periodically with accuracy and backpressure would by itself possibly disrupt the period. You have to decide what to do if the consumer can't do its job within the period, hence the `onBackpressure` use. Example 2 makes no sense as `range` fully supports backpressure and unbounding it with `onBackpressureBuffer` results in memory bloat. – akarnokd Nov 15 '22 at 08:16
  • I did not understand correctly about interval .if we use interval ,we must apply backpressure and program doesn't happen crash. here It did not use backpressure and crashed the app https://stackoverflow.com/questions/40323307/observable-vs-flowable-rxjava2 – a.rah Nov 15 '22 at 22:09
  • Not sure what your problem is then. `interval` states its backpressure support is ERROR, http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/annotations/BackpressureKind.html#ERROR , so you need an `onBackpressureX` after it. There is a backpressure-supporting version in this library: https://github.com/akarnokd/RxJavaExtensions#flowablesintervalbackpressure – akarnokd Nov 16 '22 at 08:12

0 Answers0