0

I have an event source which fired by a Network I/O very frequently, based on underlying design, of course the event was always on different thread each time, now I wrapped this event via Rx with: Observable.FromEventPattern(...), now I'm using the TakeWhile(predict) to filter some special event data. At now, I have some concerns on its thread safety, the TakeWhile(predict) works as a hit and mute, but in concurrent situation, can it still be guaranteed? because I guess the underlying implementation could be(I can't read the source code since it's too complicated...):

    public static IObservable<TSource> TakeWhile<TSource>(this IObservable<TSource> source, Func<TSource, bool> predict)
    {
        ISubject<TSource> takeUntilObservable = new TempObservable<TSource>();
        IDisposable dps = null;
        // 0 for takeUntilObservable still active, 1 for predict failed, diposed and OnCompleted already send. 
        int state = 0;
        dps = source.Subscribe(
             (s) =>
             {
                 /* NOTE here the 'hit and mute' still not thread safe, one thread may enter 'else' and under CompareExchange, but meantime another thread may passed the predict(...) and calling OnNext(...)
                  * so the CompareExchange here mainly for avoid multiple time call OnCompleted() and Dispose();
                  */

                 if (predict(s) && state == 0)
                 {
                     takeUntilObservable.OnNext(s);
                 }
                 else
                 {
                     // !=0 means already disposed and OnCompleted send, avoid multiple times called via parallel threads.
                     if (0 == Interlocked.CompareExchange(ref state, 1, 0))
                     {
                         try
                         {
                             takeUntilObservable.OnCompleted();
                         }
                         finally
                         {
                             dps.Dispose();
                         }
                     }
                 }
             },
             () =>
             {
                 try
                 {
                     takeUntilObservable.OnCompleted();
                 }
                 finally { dps.Dispose(); }
             },
             (ex) => { takeUntilObservable.OnError(ex); });
        return takeUntilObservable;
    }

That TempObservable is just a simple implementation of ISubject.
If my guess reasonable, then seems the thread safety can't be guaranteed, means some unexpected event data may still incoming to OnNext(...) because that 'mute' is still on going. Then I write a simple testing to verify, but out of expectation, the results are all positive:

    public class MultipleTheadEventSource
    {
        public event EventHandler OnSthNew;
        int cocurrentCount = 1000;
        public void Start()
        {
            for (int i = 0; i < this.cocurrentCount; i++)
            {
                int j = i;
                ThreadPool.QueueUserWorkItem((state) =>
                {
                    var safe = this.OnSthNew;
                    if (safe != null)
                        safe(j, null);
                });
            }
        }
    }

    [TestMethod()]
    public void MultipleTheadEventSourceTest()
    {
        int loopTimes = 10;
        int onCompletedCalledTimes = 0;
        for (int i = 0; i < loopTimes; i++)
        {
            MultipleTheadEventSource eventSim = new MultipleTheadEventSource();
            var host = Observable.FromEventPattern(eventSim, "OnSthNew");
            host.TakeWhile(p => { return int.Parse(p.Sender.ToString()) < 110; }).Subscribe((nxt) =>
            {
                //try print the unexpected values, BUT I Never saw it happened!!!
                if (int.Parse(nxt.Sender.ToString()) >= 110)
                {
                    this.testContextInstance.WriteLine(nxt.Sender.ToString());
                }
            }, () => { Interlocked.Increment(ref onCompletedCalledTimes); });
            eventSim.Start();
        }

        // simply wait everything done.
        Thread.Sleep(60000);
        this.testContextInstance.WriteLine("onCompletedCalledTimes: " + onCompletedCalledTimes);
    }

before I do the testing, some friends here suggest me try to use Synchronize<TSource> or ObserveOn to make it thread safe, so any idea on my proceeding thoughts and why the issue not reproduced?

Shawn
  • 702
  • 1
  • 9
  • 36
  • 1
    [Are IEnumerable Linq methods thread-safe?](http://stackoverflow.com/questions/11103779/are-ienumerable-linq-methods-thread-safe) No, they are not. [How can I make them thread safe?](http://stackoverflow.com/a/1605761/284240) – Tim Schmelter Sep 18 '13 at 10:01
  • @TimSchmelter: My question is for Reactive Extension, I don't think you posted links are correlated. – Shawn Sep 18 '13 at 10:17

1 Answers1

3

As per your other question, the answer still remains the same: In Rx you should assume that Observers are called in a serialized fashion.

To provider a better answer; Originally the Rx team ensured that the Observable sequences were thread safe, however the performance penalty for well behaved/designed applications was unnecessary. So a decision was taken to remove the thread safety to remove the performance cost. To allow you to opt back into to thread safety you could apply the Synchronize() method which would serialize all method calls OnNext/OnError/OnCompleted. This doesn't mean they will get called on the same thread, but you wont get your OnNext method called while another one is being processed.

The bad news, from memory this happened in Rx 2.0, and you are specifically asking about Rx 1.0. (I am not sure Synchonize() even exists in 1.xx?)

So if you are in Rx v1, then you have this blurry certainty of what is thread safe and what isn't. I am pretty sure the Subjects are safe, but I can't be sure about the factory methods like FromEventPattern.

My recommendation is: if you need to ensure thread safety, Serialize your data pipeline. The easiest way to do this is to use a single threaded IScheduler implementation i.e. DispatcherScheduler or a EventLoopScheduler instance.

Some good news is that when I wrote the book on Rx it did target v1, so this section is very relevant for you http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html

So if your query right now looked like this:

Observable.FromEventPatter(....)
          .TakeWhile(x=>x>5)
          .Subscribe(....);

To ensure that the pipeline is serialized you can create an EventLoopScheduler (at the cost of dedicating a thread to this):

var scheduler = new EventLoopScheduler();
Observable.FromEventPatter(....)
          .ObserveOn(scheduler)
          .TakeWhile(x=>x>5)
          .Subscribe(....);
Community
  • 1
  • 1
Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • :Yes, `Synchronize()` do exists in Rx1.0. If my test always positive, then it's out of expectation of always behaved thread safe. Maybe as your said, in Rx1.0, MS already make a `lock` internally. Could you talk a bit more on `EventLoopScheduler`? I couldn't find more info on web, what's the different to create a new thread? – Shawn Sep 23 '13 at 06:10
  • I detail `EventLoopScheduler` and the other `IScheduler`s for Rx v.1 over at IntroToRx.com. If you are confident, then jump straight to http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#EventLoopScheduler (however probably best to read from the start) – Lee Campbell Sep 23 '13 at 08:52
  • Yes, I read all your tutorial and that is awesome, but I can't very understand this `EventLoopScheduler`, what's the difference from the `NewThreadScheduler`? it seems both of them are created a new thread. – Shawn Sep 24 '13 at 01:23
  • 1
    The EventLoopScheduler creates a new thread that you control the lifetime of. The NewThreadScheduler will create a new thread every time, unless it is a nested schedule call. Basically if you want to dedicate a single thread to something, you use an EventLoopScheduler. Ideally you would even provide a name for the Thread too. – Lee Campbell Sep 25 '13 at 21:24
  • In fact, to better answer your question, the `ObserveOn` operator itself, will ensure that your sequence is serialized. This way you don't specifically need an EventLoopScheduler, but if you did, it would be serialized and also would be deterministic which thread the value was propagated on. – Lee Campbell Aug 05 '14 at 21:41