3

I am trying to implement an event throttling with reactive extensions. I have a system where events might be raised with high frequency for a specific user or other entity type. What i need is to delay the event for specific amount of time and once the timeout expires raise the event with the last value.

What i have done is this

 private Subject<int> userBalanceObservable = new Subject<int>();
 userBalanceObservable.Sample(TimeSpan.FromSeconds(sampleSeconds))
            .Subscribe(sample => OnRaiseBalanceEvent(sample));

when event occurs

userBalanceObservable.OnNext(userId);

Edit

The problem with this approach is that the event is raised for the last value passed to OnNext, what i would actually need is to have a dellay for each value passed to OnNext.

E.g OnNext(1),OnNext(2),OnNext(3) i would need to have a delayed call for 1,2,3 instead i am getting only the last value which is 3.

NullReference
  • 862
  • 1
  • 11
  • 27
  • 1
    Next time try to post a Minimum Complete Verifiable Example (http://stackoverflow.com/help/mcve) so that we know exactly what you are trying to achieve. Ideally with unit tests. – Lee Campbell Oct 08 '16 at 03:40
  • You really need to show us all of the code - especially what's happening in `OnRaiseBalanceEvent` and how `userBalanceObservable` gets its values. Just as a small hint though, if you're using a `Subject` you're probably doing something wrong. – Enigmativity Oct 08 '16 at 08:13

2 Answers2

1

Sample publishes the last value every time the sample interval is hit. If that is the behaviour you require then it is fine. Have a look at http://www.introtorx.com/content/v1.0.10621.0/13_TimeShiftedSequences.html#Sample For more information + other ways to slow down the emission rate.

In regards to the new information about your question:

If you want to emit all the values after a certain timeout has been hit you can group these until your timeout has hit (NOTE: You can run out of memory if you keep adding events without your timeout ever hitting due to the frequencey of event emissions)

You can create a Buffer which fills up until a Debounce timeout hits, see this answer on SO for pointers: How to implement buffering with timeout in RX

Community
  • 1
  • 1
Mark van Straten
  • 9,287
  • 3
  • 38
  • 57
  • Actually it seem that it doesn't really work as i would expect. What i need is to have a sample delayed for each specific item and not generally any item. E.g OnNext(1),OnNext(2),OnNext(3) i would expect to have a delayed call for 1,2,3 instead i am getting only the last value which is 3. Does reactive extensions support such functionality ? – NullReference Oct 07 '16 at 15:04
  • Try Delay. See http://www.introtorx.com/content/v1.0.10621.0/13_TimeShiftedSequences.html#Delay – Peter Bons Oct 07 '16 at 19:38
  • @NullReference - That's not what you asked for in the question. Can you please update the question? – Enigmativity Oct 08 '16 at 08:12
1

Does not just buffering work? The only "issue" is that OnRaiseBalanceEvent has to work with list instead just one value but all problems in computer science can be solved by another level of indirection ;)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Reactive.Subjects;
using System.Reactive.Linq;

namespace ConsoleApplication1
{
  class Program
  {
    static void Main(string[] args)
    {

      Subject<int> userBalanceObservable = new Subject<int>();
    userBalanceObservable.Buffer(TimeSpan.FromSeconds(2)) //get List of items
                     .Subscribe(sampleList => ProcessSamples(sampleList));

      int cont = 0;

      while (!Console.KeyAvailable)
        {
        userBalanceObservable.OnNext(cont);
        cont++;
        userBalanceObservable.OnNext(cont);
        cont++;
        Thread.Sleep(1000);
      }

    }

    private static void ProcessSamples(IList<int> sampleList)
    {
      Console.WriteLine("[{0}]", string.Join(", ", sampleList.ToArray()));
    }

  }
}
jlvaquero
  • 8,571
  • 1
  • 29
  • 45
  • Hmm i think buffer can do the job. I can use .Where(x=>x.Count >0) to silence empty sequences and Distinct() to get just one instance of value. – NullReference Oct 13 '16 at 08:17