0

In my chat client application, I have a Subject which gets OnNext-ed when a connection to the server drops OR a connection attempt fails. There is a subscription that when gets notified makes an attempt at a connection.

The way my chat application is and the way server connection works it sometimes allows multiple notifications around the same time, meaning while the subscription is processing one Job, it gets another in the queue which NEEDS TO BE IGNORED.

I am using an external local thread-safe variable to let my subscription know if it should ignore any other notifications.

reconnectSubject
     .Where(x => this.IsRetryInProcess == false)
     .Select(x => 
                 {
                   this.SetRetryInProcessThreadSafe(true); 
                   var result = MakeConnectionAttemptAndReturnResult();
                   this.SetRetryInProcessThreadSafe(false);
                   return result;
                 )
     .Where(x => x.IsSuccessful == false)
     .Subscribe(x => reconnectSubject.OnNext(Unit.Default));

This works but it lacks keeping things coherent or encapsulation. I have mixed together the two worlds there. Is there any Operator in Observable that can help me achieve this without a external flag? May be some sort of an observable that requires ACKs?

fahadash
  • 3,133
  • 1
  • 30
  • 59
  • is the [`ExhaustMap`](https://stackoverflow.com/questions/64353907/how-can-i-implement-an-exhaustmap-handler-in-rx-net) operator any helpful? – Theodor Zoulias Jan 05 '22 at 07:21
  • It's usually a code smell when you call an `OnNext` in a `Subscribe`. There's almost always a cleaner more idiomatic way to do it. Your code seems to have the following semantics: `Observable.Defer(() => Observable.Start(() => MakeConnectionAttemptAndReturnResult())).Retry()`. It would be great to see how you know when the server drops the connection. – Enigmativity Jan 06 '22 at 23:18
  • 1
    I'd also love to see a [mcve] to know when `IsRetryInProcess` is actually important. – Enigmativity Jan 06 '22 at 23:18
  • @Enigmativity It is an event that gets fired, somehow that event gets fired multiple times from the library that is not in my control. I got that solved throuth `Throttle` – fahadash Jan 07 '22 at 19:25
  • @fahadash - It seems like if you had an observable based on the event, ran it through `ExhaustMap` and then used my query, that might work. `var subscription = disconnectEventObservable.ExhaustMap(u => Observable.Defer(() => Observable.Start(() => MakeConnectionAttemptAndReturnResult())).Retry()).Subscribe();` – Enigmativity Jan 07 '22 at 22:25

0 Answers0