In my chat client application, I have a Subject which gets OnNext
-ed when a connection to the server drops OR a connection attempt fails. There is a subscription that when gets notified makes an attempt at a connection.
The way my chat application is and the way server connection works it sometimes allows multiple notifications around the same time, meaning while the subscription is processing one Job, it gets another in the queue which NEEDS TO BE IGNORED.
I am using an external local thread-safe variable to let my subscription know if it should ignore any other notifications.
reconnectSubject
.Where(x => this.IsRetryInProcess == false)
.Select(x =>
{
this.SetRetryInProcessThreadSafe(true);
var result = MakeConnectionAttemptAndReturnResult();
this.SetRetryInProcessThreadSafe(false);
return result;
)
.Where(x => x.IsSuccessful == false)
.Subscribe(x => reconnectSubject.OnNext(Unit.Default));
This works but it lacks keeping things coherent or encapsulation. I have mixed together the two worlds there. Is there any Operator in Observable that can help me achieve this without a external flag? May be some sort of an observable that requires ACKs?