-2

I have a hot observable that emits at random intervals like different numbers.

1--1-----1--1-2--4

I am looking for a way when there are duplicates within a predefined interval to take this item and merge it back to the sequence until it finds space to bypass the interval threshold. I have implemented a solution which I believe is not optimum as when I test it in production with real objects instead of Integers it creates kind of back-pressure in the system and I see the CPU going mad. Below is the test of what I have up to now.

using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Microsoft.Reactive.Testing;
using Xunit;
using Xunit.Abstractions;

namespace Specs{
    public  class CollectDuplicatesSpecs:ReactiveTest{
        private readonly ITestOutputHelper _outputHelper;

        public CollectDuplicatesSpecs(ITestOutputHelper outputHelper){
            _outputHelper = outputHelper;
        }

        [Fact]
        public void MethodName(){
            var testScheduler = new TestScheduler();
            var hotObservable = testScheduler.CreateHotObservable(OnNext(10, 1), OnNext(20, 1), OnNext(30, 1),OnNext(40, 1));
            var subject = new Subject<int>();
            hotObservable.Merge(subject).Window(TimeSpan.FromTicks(20), testScheduler).Select(observable => {
                observable.CollectDuplicates(i => i).Delay(TimeSpan.FromTicks(1), testScheduler).Subscribe(subject);
                return observable.Distinct();
            }).SelectMany(observable => observable).Subscribe(i => _outputHelper.WriteLine($"{testScheduler.Clock}-{i}"));

            testScheduler.AdvanceBy(160);
        }
    }

    public static class RxEx{
        public static IObservable<TSource> CollectDuplicates<TSource>(this IObservable<TSource> source, Func<TSource, int> keySelector = null) {
            return Observable.Create<TSource>(observer => {
                var dubplicateCollector = new DubplicateCollector<TSource>(keySelector);
                var duplicateCollectorSubscription = dubplicateCollector.Matches.Subscribe(observer);
                var disposable = source.Distinct(dubplicateCollector).Finally(dubplicateCollector.Dispose).Subscribe();
                return new CompositeDisposable(disposable, duplicateCollectorSubscription, dubplicateCollector);
            });
        }
    }

    public class DubplicateCollector<TSource> : IEqualityComparer<TSource>,IDisposable {
        private readonly Func<TSource, int> _keySelector;
        readonly Subject<TSource> _matches = new Subject<TSource>();

        public DubplicateCollector(Func<TSource, int> keySelector) {
            _keySelector = keySelector;
        }

        public IObservable<TSource> Matches => _matches;

        public bool Equals(TSource x, TSource y) {
            var equals = IsMatch(x, y);
            if (equals)
                _matches.OnNext(x);
            return equals;
        }

        private bool IsMatch(TSource x, TSource y) {
            if (_keySelector != null)
                return _keySelector(x).Equals(_keySelector(y));
            var equals = x != null && x.Equals(y);
            return equals;
        }

        public int GetHashCode(TSource obj) {
            return _keySelector(obj);
        }

        public void Dispose(){
            _matches?.Dispose();
        }
    }


}

which prints

10-1
21-1
40-1
60-1

Apostolis Bekiaris
  • 2,145
  • 2
  • 18
  • 21
  • 1
    What does "merge it back to the sequence until it finds space to bypass the interval threshold" mean? – Enigmativity Mar 06 '18 at 09:15
  • in the above marble, say we have a threshold of 2 dashes then second and forth 1 should be removed from the sequence. Those two items that skipped must not skipped for ever but emitted later when the dash threshold is larger than 2. – Apostolis Bekiaris Mar 06 '18 at 14:02
  • I'm glad you've got an answer, because it's still not clear to me what you're asking. – Enigmativity Mar 06 '18 at 22:48

1 Answers1

0

I'm struggling to get what you want: Some marble diagrams may help. I'm assuming you essentially want something like a smoothing operator: If messages come in bursts, then smooth them out over time somehow.

Based on this answer, you can create an operator that handles the smoothing:

public static class ObservableDrainExtensions
{
    public static IObservable<T> TimeDrained<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
    {
        return source.Drain(x => Observable.Empty<T>().Delay(ts, scheduler).StartWith(x));
    }

    public static IObservable<T> TimeDrained<T>(this IObservable<T> source, TimeSpan ts)
    {
        return TimeDrained(source, ts, Scheduler.Default);
    }

    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

Drain can linearly smooth things out, TimeDrained does so based on a TimeSpan. You can combine this with GroupBy to add the distinct element to it:

[Fact]
public void MethodName()
{
    var testScheduler = new TestScheduler();
    var hotObservable = testScheduler.CreateHotObservable(
        OnNext(10, 1), 
        OnNext(20, 1), 
        OnNext(30, 1), 
        OnNext(40, 1)
    );

    var ts = TimeSpan.FromTicks(20);

    hotObservable
        .GroupBy(i => i)            //comparison key
        .Select(g => g.TimeDrained(ts, testScheduler))
        .Merge()
        .Subscribe(i => Console.WriteLine($"{testScheduler.Clock}-{i}"));

    testScheduler.AdvanceBy(160);
}

Output is:

10-1
30-1
50-1
70-1

If this isn't what you're looking for, then please clear up the question.

Shlomo
  • 14,102
  • 3
  • 28
  • 43