I have the following code structure
Service
public Flowable entryFlow()
{
return Flowable.fromIterable(this::getEntries)
}
Consumer
void start()
{
disposable = service
.entryFlow()
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.computation())
.subscribe(
entry -> ...,
this::onError,
this::subscriptionFinished);
}
void stop()
{
disposable.dispose();
}
private void onError(Throwable e)
{
subscriptionFinished();
}
private void subscriptionFinished()
{
//
}
I need a way to stop the flowable from fetching and emitting data when the stop method is called.
By doing the following, I noticed that the doOnCancel lambda is not always called.
void start()
{
disposable = service
.entryFlow()
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.computation())
.doOnCancel(this::snapshotFinished)
.subscribe(
entry -> ...,
this::onError,
this::subscriptionFinished);
}
void stop()
{
disposable.dispose();
}
Alternative would be
volatile stopped;
void start()
{
disposable = service
.entryFlow()
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.computation())
.takeUntil(x -> stopped)
.subscribe(
entry -> ...,
this::onError,
this::subscriptionFinished);
}
void stop()
{
stopped = true;
}
What would be the recommended implementation of start and stop such that the flowable stops emitting and onComplete or a similar method (doOnCancel action?) is called?
Later Edit:
To make my use-case shorter
Is is enough to call disposable.dispose to stop the flowable getting data from iterable and emitting to source? I only have 1 subscriber and need to have either onComplete/onError/other-callback called when the flowable ends regardless of cause.
By other callback I mean one of doOnCancel/doFinally etc.
Thank you