Given a source provider like below:
IObservable<ISource> Sources();
with each ISource looking like below:
IObservable<IEnumerable<string>> ObserveData(string filter)
I'd like to return:
IObservable<IEnumerable<string>> Results
when a given string is returned from all ISources. Essentially I want the intersection of all the sources.
If a new source is added then everything should re-evaluate.
I'm struggling to come up with a generic solution to this. Most solutions I've seen have a well known number of sources. Any ideas appreciated.
Answer Ok after thinking for a while longer I came up with my answer. Possibly it can be improved on but it seems to work for me so I'll post it here for reference in case someone else has a similar issue. Thanks to ibebbs and Shlomo for taking the time to reply, much appreciated.
//Arrange
var s1 = Substitute.For<ISource>();
s1.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "a", "b", "c", "d" }));
var s2 = Substitute.For<ISource>();
s2.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "b", "xx", "c", "d" }));
var s3 = Substitute.For<ISource>();
s3.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "yy", "b", "ff", "d" }));
var expected = new[] { "b", "d" };
var sources = new[] { s1, s2, s3 }.ToObservable();
var scheduler = new TestScheduler();
var observer = scheduler.CreateObserver<IList<string>>();
//Act
sources.Buffer(TimeSpan.FromMilliseconds(500), scheduler)
.Select(s => Observable.CombineLatest(s.Select(x => x.ObserveData("NoFilter"))))
.Switch()
.Select(x =>IntersectAll(x))
.Do(x => Console.WriteLine($"Recieved {string.Join("," , x)}"))
.Subscribe(observer);
scheduler.AdvanceBy(TimeSpan.FromMilliseconds(500).Ticks);
//Assert
observer.Messages.AssertEqual(
OnNext<IList<string>>(0, s => s.SequenceEqual(expected)),
OnCompleted<IList<string>>(0));
For IntersectAll, see Intersection of multiple lists with IEnumerable.Intersect()