6

recently I realized that I don't understand how RxJava2 backpressure works.

I made small test and I expect that it should fail with MissingBackpressureException exception:

@Test
public void testBackpressureWillFail() {
    Observable.<Integer>create(e -> {
        for (int i = 0; i < 10000; i++) {
            System.out.println("Emit: " + i);
            e.onNext(i);
        }
        e.onComplete();
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .doOnNext(i -> {
        Thread.sleep(100);
        System.out.println("Processed:" + i);
    })
    .blockingSubscribe();
}

System out shows next:

Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000

Processed:0
Processed:1
Processed:2
...
Processed:10000

Why it doesn't produce MissingBackpressureException.

I expect that e.onNext(i); will put item into buffer of ObservableObserveOn and after it's size is greater than static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());

It should throw MissingBackpressureException which doesn't happen. Does the buffer automatically grow? If not where are items stored?

tir38
  • 9,810
  • 10
  • 64
  • 107
Rostyslav Roshak
  • 3,859
  • 2
  • 35
  • 56
  • 1
    `Observable`s in RxJava2 don't support backpressure, only `Flowable`s do – Tassos Bassoukos Jun 21 '17 at 11:42
  • 3
    I know that, they doesn't support backpreassure, but I thought that doesn't support means that MissingBackpressureException would be thrown, not auto grow of buffer. – Rostyslav Roshak Jun 21 '17 at 12:08
  • 1
    @RostyslavRoshak "Observable doesn't support backpressure" means that when there is a source emitting items faster than the consumer can handle them, these items are buffered unboundedly and it will be the case until an OutOfMemory exception is thrown because of the resources' shortage. However, for a Flowable, as long as the buffer (which is bounded) is full, a MissingBackpressureException is thrown. – HiddenDroid Dec 21 '19 at 22:48

1 Answers1

3

That's because backpressure moved out to Flowableonly with RxJava2, see here.
If you will switch to Flowable with BackpressureStrategy.MISSING you will get the exception.
That also means that in your case you indeed have buffer that automatically grows, from observerOn docs:

Modifies an ObservableSource to perform its emissions and notifications on a specified Scheduler, asynchronously with an unbounded buffer...

yosriz
  • 10,147
  • 2
  • 24
  • 38
  • Thank you, Could you please explain, what is the reason of having public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) in Observable, if buffer is unbounded? Am I correct that it would be the size in which buffer would grow? – Rostyslav Roshak Jun 21 '17 at 12:17
  • yet, I think that too, according to documentation this is the configurable size of "island" which are the incremental step of the buffer. – yosriz Jun 21 '17 at 12:32
  • @RostyslavRoshak Indeed, this bufferSize parameter is only for the incremental step of the buffer. The Observable buffer's size is always set to 128 elements by default and it gets incremented as needed. – HiddenDroid Dec 21 '19 at 23:03