0

I am developing a solution where changes occur at high rate for example for a user. What I need is to record each change, delay notification and then return distinct changes e.g user ids for users that had been changed. I have come up with the following code using rx:

private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
    .Buffer(EVENT_BUFFER_DELAY)
    .Where(buffer => buffer.Count > 0)                
    .Subscribe(OnEventBufferProcess);

This seem to work correctly and I get all the values that were added by

userEventSubject.OnNext(userId);

My question is: Can I distinct the changes e.g when having multiple OnNext with same user id value I don't want the resulting buffer to contain duplicates. Of course I can distinct values in the Subscribe handler but I wondered if this can be done on rx level? I tried the distinct method but would still get all the values.

Since all I want is to track changes made during the delay, return to Subscribe handler and start over, do I need to clear the userEventSubject?

Manfred Radlwimmer
  • 13,257
  • 13
  • 53
  • 62
NullReference
  • 862
  • 1
  • 11
  • 27

2 Answers2

4

If you use Observable.Distinct method, you'll be checking for distinct IList<int>s which indeed isn't the behaviour that you want.

It seems that you need to apply the distinct to the elements within the buffer.

You can apply this to every buffer using the Observable.Select method:

private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
    .Buffer(EVENT_BUFFER_DELAY)
    .Where(buffer => buffer.Count > 0)
    .Select(buffer => buffer.Distinct()) // distinct elements in each buffer
    .Subscribe(OnEventBufferProcess);

Be aware that whenever you use Distinct() methods, you need to be wary about how you are checking equality, if you are working with reference types - you will likely need to implement some kind of equality comparer. In this case, it's not a problem because you're working with ints.

TheInnerLight
  • 12,034
  • 1
  • 29
  • 52
  • Yes i understand that i will need to have an Equality comparer for reference types. One question though, would it be safe for OnEventBufferProcess to be async? As i understand it correctly while OnEventBufferProcess still working any calls to OnNext will be blocked ? – NullReference Feb 16 '17 at 11:55
  • 1
    @NullReference Yes, you are correct with regards to blocking. There are a few questions regarding subscription via async methods, the solution depends on the exact behaviour you're looking for. See here for example: http://stackoverflow.com/questions/18814805/is-there-a-way-to-subscribe-an-observer-as-async – TheInnerLight Feb 16 '17 at 12:03
  • What is needed is to do the processing on separate thread/threads so the OnNext never be blocked. As i understand ObserveOn will allow me to specify Scheduler on which callbacks should run on. So this ObserveOn(ThreadPoolScheduler.Instance) will make callbacks to run on one of ThreadPool threads and will not block as long as ThreadPool has available thread? From a test i can see that it seem to work as expected :) – NullReference Feb 16 '17 at 13:28
  • Just to be clear, using `ObserveOn` will create an implicit queue. This queue will hold the callbacks that have yet to be processed allowing the source observable to be un-blocked. It generally (unless you make it) wont introduce parallel processing if that is what you wanted. The subscriber will still get callbacks in a serialized fashion. – Lee Campbell Feb 17 '17 at 23:52
  • @LeeCampbell Indeed in some cases i would want to allow more callbacks to be processed in parallel. What would be a correct way to achieve that? I thought of firing a task that would do the processing but not sure if that would be the right way to go. – NullReference Feb 22 '17 at 12:48
  • I prefer not to mix Rx with Task (lazy vs eager, explicit concurrency vs implicit) but if you did, project to a method that returns a task and the ToObservable on that. Instead of usinh Select for your projection, use SelectMany – Lee Campbell Feb 22 '17 at 13:26
0

You could use DistinctUntilChanged extension method:

private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
    .DistinctUntilChanged()
    .Buffer(EVENT_BUFFER_DELAY)    
    .Where(buffer => buffer.Count > 0)                
    .Subscribe(OnEventBufferProcess);
ds-b
  • 361
  • 1
  • 5