0

I have subscribed an Observable with high frequency of pushing content, these content are from network I/O, so each pushing was originally from different thread, then I have some observers are likely try to get some content and then unsubscribe quickly to make sure there's no other content incoming, so the code sample is like:

        IDisposable dsp = null;
        dsp = TargetObservable.Subscribe((incomingContent) =>
        {
            if (incomingContent == "something")
            {
                myList.Add(incomingContent);
                dsp.Dispose();
            }
            else
            {
                otherList.Add(incomingContent);
            }
        });

For now, the OnNext obviously is not thread safe, means when the Observer get the "something" and right before the calling Dispose(), other content may still incoming and added to 'otherList', even I put a 'lock(...)' for the whole 'onNext(...)'.
This is not we want, so any idea to avoid this? one way I can think about is to modify the Observable to push content one by one(by using 'lock'), then the performance must be hurts lot. Thanks.

Shawn
  • 702
  • 1
  • 9
  • 36

3 Answers3

5

To use Rx you need to follow the Rx Guidelines. In your case there are issues with 4.2 Assume observer instances are called in a serialized fashion and the solution is to use Synchronize which basically introduces the lock you want to avoid. If you cannot afford a lock statement in your code you need to write your own "cheap" synchronization before firing network events into Rx.

With a synchronized sequence you can simplify the code in your OnNext handler by using Rx LINQ operators like TakeWhile:

var subscription = TargetObservable
  .Synchronize()
  .TakeWhile(incomingContent => incomingContent != "something"))
  .Subscribe( ... );

or you can create your own operator TakeWhileInclusive to include the last item for which the predicate is false:

static class ObservableExtensions {

  public static IObservable<TSource> TakeWhileInclusive<TSource>(
       this IObservable<TSource> source, 
       Func<TSource, Boolean> predicate) {
    return Observable
      .Create<TSource>(
        observer => source.Subscribe(
          item => {
            observer.OnNext(item);
            if (!predicate(item))
              observer.OnCompleted();
          },
          observer.OnError,
          observer.OnCompleted
        )
      );
  }
}
meilke
  • 3,280
  • 1
  • 15
  • 31
Martin Liversage
  • 104,481
  • 22
  • 209
  • 256
  • Ok, so my requirement of 'GetAndStop' is not quite suits the original purpose of Rx, so with that 'Synchronize()', I can make sure all the pushing are one by one, the implementation inside that synchronized Observable I guess could be: before each OnNext, add a `lock` and then determine was current observer disposed, if no, then call the Observer's OnNext? – Shawn Sep 16 '13 at 09:12
  • BTW, is it common to call `dispose()` in observer's `OnNext(...)`? I worried about the thread safe, since I guess the dispose would try remove current observer from observable's internal list, but meanwhile, that list may under iteration? – Shawn Sep 16 '13 at 09:28
  • @Shawn: Yes, `Synchronze` will ensure that you safely can use Rx operators on your `IObservable` even if the original events are fired asynchronously on many threads. This is mainly achieved by creating an `OnNext` handler that performs `lock (_gate) _observer.OnNext(value);` – Martin Liversage Sep 16 '13 at 09:29
  • 2
    @Shawn: You do not need to dispose the subscription from within the `OnNext` handler. Instead, when `OnCompleted` is fired the subscription will get disconnected from the underlying `IObservable`. In my example `TakeWhile` will fire `OnCompleted` when the predicate becomes false and the subscription will get disconnected. Disposing the subscription is useful if you want to turn it off "from the outside", e.g. during shutdown or similar activities. – Martin Liversage Sep 16 '13 at 09:44
  • OK, so my choice would be only `Synchronize`, the last concern is the network I/O are always sending via different thread, so here the `lock` will **also block** other observers even without the `Synchronize` syntax, correct? This is too bad for the concurrency. – Shawn Sep 16 '13 at 10:06
  • 2
    @Shawn: You can use `ObserveOn` to dispatch incoming events on a different thread. Then the `Synchronize` lock will only be taken for the duration of time required to queue up the incoming network packet. In my experience Rx is pretty efficient event though it introduces some overhead to properly handle concurrency. If you consider this overhead too big you need to handle concurrency using lower level primitives with much higher probability of bugs. It is best to avoid premature optimization so maybe you should evaluate the cost of taking a lock before spending too much effort avoiding it? – Martin Liversage Sep 16 '13 at 12:16
  • 1
    If you use `ObserveOn`, you may find there is no longer a need for the `Synchronize` method, as all of your `OnNext` calls will be serialized to the `IScheduler` instance. This assumes a single threaded `IScheduler` like the WPF dispatcher or `EventLoopScheduler`. – Lee Campbell Sep 17 '13 at 08:25
  • @LeeCampbell I just guessing the pushing Observers' `onNext(...)` logic in an observable is like: `observers.ToList().ForEach((ob)=>{ob.OnNext(...)})`, if so, then once I used `Synchronize`, it would cause all other observers to deferred to receive the pushing, correct? – Shawn Sep 18 '13 at 10:06
  • @LeeCampbell: BTW, I created another topic about the thread safety in some Rx extenstion methods, at [link](http://stackoverflow.com/questions/18869221/is-takewhile-and-etc-extension-methods-thread-safe-in-rx-1-0) , please help to take a look, thanks. – Shawn Sep 18 '13 at 10:10
1

You are mixing some non-Rx code with Rx code. You should avoid disposing of subscriptions within subscribes.

Here's my preferred way of doing what want:

TargetObservable
    .TakeWhile(x => x != "something")
    .Subscribe(otherList.Add);

TargetObservable
    .Where(x => x == "something")
    .Take(1)
    .Subscribe(myList.Add);

That's it as both subscriptions will automatically unsubscribe. It your TargetObservable source doesn't generate shared streams you may need to use Publish to make a shared observable.

Alternatively, you could do this:

TargetObservable
    .Do(x =>
    {
        if (x != "something") otherList.Add(x);
    })
    .Where(x => x == "something")
    .Take(1)
    .Subscribe(x => myList.Add(x));
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
-1

Throttle might be able to do the job: http://rxwiki.wikidot.com/101samples#toc30.

meilke
  • 3,280
  • 1
  • 15
  • 31