3

Most of the Flowable.subscribe() overloads return a Disposable which enable a flow to be cleaned up. I'm in the habit of doing:

Disposable d = Flowable.just()
    .map(...)
    .subscribe(
        n -> ...
        t -> ...
        () -> ...
    );

// someone clicks "cancel" in another thread
d.dispose();

However, when using .subscribe(Subscriber) the Disposable is not returned. I'd like to use .subscribe(Subscriber) so I can pass in a TestSubscriber to verify behaviour. So how would I dispose the flow in this case?

I searched the Javadoc for appropriate Subscribers. There's DisposableSubscriber which looks like it would work, but two problems:

  1. The class description reads as follows, which suggests cancel() cannot be used from outside a flow:

Use the protected request(long) to request more items and cancel() to cancel the sequence from within an onNext implementation.

  1. TestSubscriber does not extend DisposableSubscriber.
Progman
  • 16,827
  • 6
  • 33
  • 48
Dan Gravell
  • 7,855
  • 7
  • 41
  • 64

1 Answers1

1

You can use Flowable.subscribeWith(Subscriber) instead of subscribe, so that your Subscriber is returned, instead of void.

In RxJava 3.x TestSubscriber no longer implements Disposable. It does implement the dispose and isDisposed methods, as defined by BaseTestConsumer, which it extends. However, both of those methods have been made protected, so you can't actually use them directly. Luckily, there is TestSubscriber.cancel()/TestSubscriber.isCancelled(), which are public, and are equivalent to dispose()/isDisposed(), so you can use those instead.

As for the reason Flowable.subscribe does not return a Disposable, this change was made in RxJava 2, to adhere to the Reactive-Streams specification:

Due to the Reactive-Streams specification, Publisher.subscribe returns void ...To remedy this, the method E subscribeWith(E subscriber) has been added to each base reactive class which returns its input subscriber/observer as is.

dano
  • 91,354
  • 19
  • 222
  • 219
  • Ok, and because the return type is parameterised I can pass a `Subscriber` that has a `dispose` method on it, e.g. `DisposableSubscriber`. However there's a problem here in that there's no a unifying type for the `DisposableSubscriber` and `TestSubscriber` - I guess I could wrap the latter in an adapter. – Dan Gravell Jun 24 '20 at 19:42
  • Also - is my concern about the `DisposableSubscriber` docs valid - _Use ... cancel() to cancel the sequence from within an onNext implementation_ - what if I'm doing it outside an `onNext()`? – Dan Gravell Jun 24 '20 at 19:43
  • 1
    You would use `DisposableSubscriber.dispose()` outside of `onNext()`. – dano Jun 24 '20 at 19:45
  • re: the `DisposableSubscriber`/`TestSubscriber` type mismatch - I'm not sure what's going on there. In RxJava 2.x, `TestSubscriber` implements `Disposable`. I'm not sure why it no longer does in 3.x. – dano Jun 24 '20 at 19:45
  • 1
    [Here's the PR where it was removed](https://github.com/ReactiveX/RxJava/pull/6526). I'm still not sure why `dispose`/`isDisposed` were kept, but the `Disposable` interface was removed from the type hierarchy, though. – dano Jun 24 '20 at 19:49
  • 1
    Oh, actually the `dispose` method was made `protected` - so that must be why it no longer implements `Disposable`. However, the `cancel()` method is still public, [and `dispose()` internally just delegates to `cancel()`](https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/subscribers/TestSubscriber.java#L241). – dano Jun 24 '20 at 19:56
  • Thanks, I guess I'll try writing a couple of adapters and probably still store it typed as a `Disposable` given that's all I want to do to it. – Dan Gravell Jun 24 '20 at 20:00