-1

I was trying to use Chunkify method to "catch" all "pending" items.. But I found a problem, consumes all the resources of one thread, does anyone know why this happens, and how can I prevent this?

In fact, my goal was to create a "spam filter" for my event, selecting only the last 5 values​​, and ignoring more than two consecutive repetitions.

An example of how the problem occurs:

Attention! The code below is stupid and pointless. It is only to demonstrate the problem, and indicate that the event can be called multiple threads (Please, run the code above and watch the output window and it is the problem).

    [TestMethod]
    public void ThreadSpinning()
    {

     var subs = Observable.FromEventPattern(add => this.Raise += add, rem => this.Raise -= rem)
                           .Select((item, countRaise) => countRaise)
                           .Chunkify()
                           .ToObservable(Scheduler.Default)
                           .Select((countRaise, countChunkify) => new { raiseItems = countRaise, countChunkify })
                           .Do(obj => Trace.Write("Chunkify = " + obj.countChunkify + " | "))
                           .Select(a => a.raiseItems)
                           .Where(a => a.Any())
                           .Do(obj =>
                           {
                               Trace.WriteLine("[ Start do something.. Raise = " + Dump(obj) + " ] " +
                                               Environment.NewLine + Environment.NewLine);

                               Thread.Sleep(700);
                           }).Subscribe();

        Thread.Sleep(2000);

        var handle = new ManualResetEventSlim(false);
        ThreadPool.QueueUserWorkItem(r =>
            {
                Thread.Sleep(500);

                Task.Factory.StartNew(() =>
                {
                    OnRaise();
                    OnRaise();

                }).Wait();

                OnRaise();
                Thread.Sleep(500);
                OnRaise();
                Task.Factory.StartNew(OnRaise).Wait();
                Thread.Sleep(1500);
                OnRaise();
                OnRaise();

                Thread.Sleep(500);

                OnRaise();

                Thread.Sleep(250);

                OnRaise();
                Task.Factory.StartNew(OnRaise).Wait();
                Thread.Sleep(500);
                Task.Factory.StartNew(OnRaise).Wait();
                Task.Factory.StartNew(OnRaise).Wait();
                Thread.Sleep(2000);

                handle.Set();
            });

        handle.Wait();
        Thread.Sleep(3000); 
        subs.Dispose();

        Thread.Sleep(1000); 
    }

    private event EventHandler Raise;

    protected virtual void OnRaise()
    {
        EventHandler handler = Raise;
        if (handler != null) 
            handler(this, EventArgs.Empty);
    }

    public static string Dump<T>(IEnumerable<T> source)
    {
        return source.Select(a => a.ToString()).Aggregate((a, b) => a + ", " + b);
    }
J. Lennon
  • 3,311
  • 4
  • 33
  • 64

1 Answers1

2

I'm not sure exactly what you're trying to do, but there are a few issues with your code:

  • You're using Chunkify to convert from an IObservable to IEnumerable, but then you convert it back to an IObservable which is a bit weird.

  • The statements

              .Select((item, count) => new { item, count })
              .Do(obj => Trace.Write(obj.count + " | "))
              .Select(a => a.item)
              .Where(a => a.Any())
              .Do(obj => Trace.WriteLine("Do something.. " + obj.Dump()))
    

    are a lot of code that seems to be doing transforms just for debugging purposes. You can just write all of the debugging code in one statement lambda inside one Do call.

  • You should not be creating new Random objects, but rather reusing one and calling Next() on it: http://msdn.microsoft.com/en-us/library/h343ddh9.aspx

  • You are using Thread.Sleep very excessively, and doing it within an observable sequence is a code smell. Try to convert your code to use various time operators, like Throttle and Delay. You might also want to create your sequence using Observable.Generate.

  • It's possible that Chunkify is not actually what you what--have you considered the Buffer operator? There are a good list of time operators here: http://introtorx.com/Content/v1.0.10621.0/13_TimeShiftedSequences.html#TimeShiftedSequences

  • To test your code, you don't need to actually raise your event handler, you can just test your subscription code by generating an observable sequence and subscribe to it. For instance, if you have a method SubscribeToMyEvent(IObservable<T>), then you can set it up by either passing an observable created with FromEventPattern or with one created with something like Interval or Generate

What exactly is your scenario? What is triggering the events? How exactly are you trying to change your event stream? It can be helpful to draw marble diagrams (e.g. http://channel9.msdn.com/blogs/j.van.gogh/reactive-extensions-api-in-depth-marble-diagrams-select--where) in order to think about your algorithm.

lindydonna
  • 3,874
  • 17
  • 26
  • Yes, I can use Buffer (with buffer closing selector), but it takes a lot of extra code, what I wrote was just an example of the problem (so it does not make much sense) the focus of the topic is the bug, not what I want to achieve. Please could you check other topics about this problem: http://stackoverflow.com/a/11887174/1128106 and http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx (page talking about collect / chunkify methods). – J. Lennon Aug 22 '13 at 00:17
  • The SO answer you linked to has some problems, as noted in the comments below it. I know how chunkify works, but I still don't understand why you are using Thread.Sleep instead of operators like Throttle and Delay. That's why you should explain your scenario more clearly, otherwise your question is too vague. – lindydonna Aug 22 '13 at 21:09
  • ok, I'll write it again, I edited my code for better understanding,, the "Thread.Sleep()" is used to simulate some process (which can take some time). – J. Lennon Aug 22 '13 at 23:18