0

Given a source provider like below:

IObservable<ISource> Sources();

with each ISource looking like below:

IObservable<IEnumerable<string>> ObserveData(string filter)

I'd like to return:

IObservable<IEnumerable<string>> Results

when a given string is returned from all ISources. Essentially I want the intersection of all the sources.

If a new source is added then everything should re-evaluate.

I'm struggling to come up with a generic solution to this. Most solutions I've seen have a well known number of sources. Any ideas appreciated.

Answer Ok after thinking for a while longer I came up with my answer. Possibly it can be improved on but it seems to work for me so I'll post it here for reference in case someone else has a similar issue. Thanks to ibebbs and Shlomo for taking the time to reply, much appreciated.

 //Arrange
        var s1 = Substitute.For<ISource>();
        s1.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "a", "b", "c", "d" }));

        var s2 = Substitute.For<ISource>();
        s2.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "b", "xx", "c", "d" }));

        var s3 = Substitute.For<ISource>();
        s3.ObserveData(Arg.Any<string>()).Returns(Observable.Return(new[] { "yy", "b", "ff", "d" }));

        var expected = new[] { "b", "d" };

        var sources = new[] { s1, s2, s3 }.ToObservable();

        var scheduler = new TestScheduler();
        var observer = scheduler.CreateObserver<IList<string>>();

        //Act
        sources.Buffer(TimeSpan.FromMilliseconds(500), scheduler)
            .Select(s => Observable.CombineLatest(s.Select(x => x.ObserveData("NoFilter"))))
            .Switch()
            .Select(x =>IntersectAll(x))
            .Do(x => Console.WriteLine($"Recieved {string.Join("," , x)}"))
            .Subscribe(observer);

        scheduler.AdvanceBy(TimeSpan.FromMilliseconds(500).Ticks);

        //Assert
        observer.Messages.AssertEqual(
            OnNext<IList<string>>(0, s => s.SequenceEqual(expected)),
            OnCompleted<IList<string>>(0));

For IntersectAll, see Intersection of multiple lists with IEnumerable.Intersect()

Community
  • 1
  • 1
user630190
  • 1,142
  • 2
  • 11
  • 26
  • Is Results a property or a method? If a property, what should.be passed in for the filter argument? – Shlomo Sep 06 '16 at 18:46

2 Answers2

1

Ok, second attempt and I'm pretty sure this is what you need (test fixture included at the bottom):

public interface ISource
{
    IObservable<IEnumerable<string>> ObserveData(string filter);
}

public static class ArbitrarySources
{
    public static IObservable<IEnumerable<string>> Intersection(this IObservable<ISource> sourceObservable, string filter)
    {
        return sourceObservable
            .SelectMany((source, index) => source.ObserveData(filter).Select(values => new { Index = index, Values = values }))
            .Scan(ImmutableDictionary<int, IEnumerable<string>>.Empty, (agg, tuple) => agg.SetItem(tuple.Index, tuple.Values))
            .Select(dictionary => dictionary.Values.Aggregate(Enumerable.Empty<string>(), (agg, values) => agg.Any() ? agg.Intersect(values) : values).ToArray());       
    }
}

public class IntersectionTest
{
    internal class Source : ISource
    {
        private readonly IObservable<IEnumerable<string>> _observable;

        public Source(IObservable<IEnumerable<string>> observable)
        {
            _observable = observable;
        }

        public IObservable<IEnumerable<string>> ObserveData(string filter)
        {
            return _observable;
        }
    }

    [Fact]
    public void ShouldIntersectValues()
    {
        TestScheduler scheduler = new TestScheduler();

        var sourceA = new Source(scheduler.CreateColdObservable(
            new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "b" })),
            new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "b", "c" }))
        ));

        var sourceB = new Source(scheduler.CreateColdObservable(
            new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "a", "c" })),
            new Recorded<Notification<IEnumerable<string>>>(TimeSpan.FromSeconds(3).Ticks, Notification.CreateOnNext<IEnumerable<string>>(new string[] { "b", "c" }))
        ));

        var sources = scheduler.CreateColdObservable(
            new Recorded<Notification<ISource>>(TimeSpan.FromSeconds(1).Ticks, Notification.CreateOnNext<ISource>(sourceA)),
            new Recorded<Notification<ISource>>(TimeSpan.FromSeconds(2).Ticks, Notification.CreateOnNext<ISource>(sourceB))
        );

        var observer = scheduler.Start(() => sources.Intersection("test"), 0, 0, TimeSpan.FromSeconds(6).Ticks);

        IEnumerable<string>[] actual = observer.Messages
            .Select(message => message.Value)
            .Where(notification => notification.Kind == NotificationKind.OnNext && notification.HasValue)
            .Select(notification => notification.Value)
            .ToArray();

        IEnumerable<string>[] expected = new []
        {
            new [] { "a", "b" },
            new [] { "a" },
            new [] { "a", "c" },
            new [] { "b", "c" }
        };

        Assert.Equal(expected.Length, actual.Length);

        foreach (var tuple in expected.Zip(actual, (e, a) => new { Expected = e, Actual = a }))
        {
            Assert.Equal(tuple.Expected, tuple.Actual);
        }
    }
}

This approach has the added benefit of not re-querying existing sources when a new source is added but will recompute the intersection each time any source emits a value.

ibebbs
  • 1,963
  • 2
  • 13
  • 20
0

How about this:

public IObservable<IEnumerable<string>> From(this IObservable<ISource> sources, string filter)
{
    return sources
        .Scan(Observable.Empty<IEnumerable<string>>(), (agg, source) => Observable.Merge(agg, source.ObserveData(filter)))
        .Switch();
}

Be aware, that every time a new source is emitted from sources all the sources that have been emitted previously will have their ObserveData method called again. Therefore this solution doesn't scale particularly well but does meet your 'If a new source is added then everything should re-evaluate' requirement

ibebbs
  • 1,963
  • 2
  • 13
  • 20
  • On first glance it looks like it will merge all the sources rather than find the intersection. So if source one outputs ["a","b","c"] and source two outputs ["b","a"] I'd only want to get ["a","b"]. – user630190 Sep 06 '16 at 16:59