I have a hot observable that emits at random intervals like different numbers.
1--1-----1--1-2--4
I am looking for a way when there are duplicates within a predefined interval to take this item and merge it back to the sequence until it finds space to bypass the interval threshold. I have implemented a solution which I believe is not optimum as when I test it in production with real objects instead of Integers it creates kind of back-pressure in the system and I see the CPU going mad. Below is the test of what I have up to now.
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using Microsoft.Reactive.Testing;
using Xunit;
using Xunit.Abstractions;
namespace Specs{
public class CollectDuplicatesSpecs:ReactiveTest{
private readonly ITestOutputHelper _outputHelper;
public CollectDuplicatesSpecs(ITestOutputHelper outputHelper){
_outputHelper = outputHelper;
}
[Fact]
public void MethodName(){
var testScheduler = new TestScheduler();
var hotObservable = testScheduler.CreateHotObservable(OnNext(10, 1), OnNext(20, 1), OnNext(30, 1),OnNext(40, 1));
var subject = new Subject<int>();
hotObservable.Merge(subject).Window(TimeSpan.FromTicks(20), testScheduler).Select(observable => {
observable.CollectDuplicates(i => i).Delay(TimeSpan.FromTicks(1), testScheduler).Subscribe(subject);
return observable.Distinct();
}).SelectMany(observable => observable).Subscribe(i => _outputHelper.WriteLine($"{testScheduler.Clock}-{i}"));
testScheduler.AdvanceBy(160);
}
}
public static class RxEx{
public static IObservable<TSource> CollectDuplicates<TSource>(this IObservable<TSource> source, Func<TSource, int> keySelector = null) {
return Observable.Create<TSource>(observer => {
var dubplicateCollector = new DubplicateCollector<TSource>(keySelector);
var duplicateCollectorSubscription = dubplicateCollector.Matches.Subscribe(observer);
var disposable = source.Distinct(dubplicateCollector).Finally(dubplicateCollector.Dispose).Subscribe();
return new CompositeDisposable(disposable, duplicateCollectorSubscription, dubplicateCollector);
});
}
}
public class DubplicateCollector<TSource> : IEqualityComparer<TSource>,IDisposable {
private readonly Func<TSource, int> _keySelector;
readonly Subject<TSource> _matches = new Subject<TSource>();
public DubplicateCollector(Func<TSource, int> keySelector) {
_keySelector = keySelector;
}
public IObservable<TSource> Matches => _matches;
public bool Equals(TSource x, TSource y) {
var equals = IsMatch(x, y);
if (equals)
_matches.OnNext(x);
return equals;
}
private bool IsMatch(TSource x, TSource y) {
if (_keySelector != null)
return _keySelector(x).Equals(_keySelector(y));
var equals = x != null && x.Equals(y);
return equals;
}
public int GetHashCode(TSource obj) {
return _keySelector(obj);
}
public void Dispose(){
_matches?.Dispose();
}
}
}
which prints
10-1
21-1
40-1
60-1