4

I have the following code structure

Service

public Flowable entryFlow()
{
    return Flowable.fromIterable(this::getEntries)
}

Consumer

void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    disposable.dispose();
}

private void onError(Throwable e)
{
    subscriptionFinished();
}

private void subscriptionFinished()
{
    //
}

I need a way to stop the flowable from fetching and emitting data when the stop method is called.

By doing the following, I noticed that the doOnCancel lambda is not always called.

void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .doOnCancel(this::snapshotFinished)
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    disposable.dispose();
}

Alternative would be

volatile stopped;

void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .takeUntil(x -> stopped)
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    stopped = true;
}

What would be the recommended implementation of start and stop such that the flowable stops emitting and onComplete or a similar method (doOnCancel action?) is called?

Later Edit:

To make my use-case shorter

Is is enough to call disposable.dispose to stop the flowable getting data from iterable and emitting to source? I only have 1 subscriber and need to have either onComplete/onError/other-callback called when the flowable ends regardless of cause.

By other callback I mean one of doOnCancel/doFinally etc.

Thank you

user4132657
  • 207
  • 1
  • 3
  • 10
  • The only reason `doOnCancel` wouldn't be called if the flow already finished at that time. Why do you need to react to cancellation only? You could consider `doFinally` to react to termination and cancellation. – akarnokd Feb 12 '18 at 19:43
  • doFinally sounds interesting, but from the javadoc I understand that it will be called in addition onComplete / orError. I don't really need to know what caused the flow to end (complete, error, cancel), but I want the end action do be executed exactly once. – user4132657 Feb 12 '18 at 20:14
  • Added a more compact description of my question in the later edit section – user4132657 Feb 13 '18 at 07:27

1 Answers1

2

I would recommend to use the dispose() method. And then just add doOnDispose to trigger your side-effect-code.

Appyx
  • 1,145
  • 1
  • 12
  • 21