1

I would like to combine multiple observables, where each one returns a single Update object, into a single dictionary object.

Here is a sample of what I am trying to achieve:

private IObservable<IDictionary<string, IUpdate>> CreateUpdateStreams(Product product)
{
  var codeObservables = product.Codes.Select(code => CreateUpdateStream(code)).ToList();

  //??? 
  return pointObs.Merge().Select(update => ...);
}


private IObservable<IUpdate> CreateUpdateStream(string code)
{
  ...
  //return an observable of IUpdate
}
  • I want to combine all of the IUpdates as they come in into a single, updating dictionary where the key = Code and Value = IUpdate
  • The caller of CreateUpdateStreams will know the Product and want to make changes to some properties of each Code object depending on the update. For example

Product = Foo

Product.Codes = {Code1, Code2, Code3}

IDictionary = {Code1, "a"}, {Code2, "b"}, {Code3, "c"}

Depending on the value of the updates (in this case a/b/c) a different change will be made to the corresponding Code, for example set a property like Code.State = "a", etc.

Since each of the codeObservables will update at different rates, Merge seemed like the sensible starting point. I am not sure though how to have the updates from the individual observables update a dictionary object, which retains past values.

Flack
  • 5,727
  • 14
  • 69
  • 104
  • What does `IUpdate` look like? – Enigmativity Jan 21 '15 at 05:43
  • Hope I did get it right :), you need Product.Codes.State if I'm right rethink you implementation and use dictionary of tuple (tuples with objects, nut just strings) http://www.codeproject.com/Articles/193537/C-Tuples or http://stackoverflow.com/questions/955982/tuples-or-arrays-as-dictionary-keys-in-c-sharp – SilentTremor Jan 21 '15 at 08:27

1 Answers1

2

Here's a shot at your problem, it takes advantage of anonymous types. It relies on side-effecting a dictionary. Note that since Rx guarantees sequential behavior there is no need for synchronization on the dictionary.

private IObservable<IReadOnlyDictionary<string, IUpdate>> CreateUpdateStreams(Product product)
    {
        var dictionary = new Dictionary<string, IUpdate>();
         return
          product.Codes.Select(
              code => CreateUpdateStream(code).Select(update => new {Update = update, Code = code}))
              .Merge()
              .Do(element => dictionary.Add(element.Code, element.Update))
              .Select(_ => dictionary);
    }

Note that I have changed the method signature to return IObservable<IReadOnlyDictionary<,>> to prevent the client code from tampering with the dictionary. Another option is to return a new copy of the dictionary every time. This ensures immutable behavior (but can have a performance impact, depending on the size of the dictionary), like so:

private IObservable<IDictionary<string, IUpdate>> CreateUpdateStreams(Product product)
    {
        var dictionary = new Dictionary<string, IUpdate>();
        return
            product.Codes.Select(
                code => CreateUpdateStream(code).Select(update => new {Update = update, Code = code}))
                .Merge()
                .Select(element =>
                {
                    dictionary.Add(element.Code, element.Update);
                    return new Dictionary<string, IUpdate>(dictionary);
                });
    }
Jonas Chapuis
  • 697
  • 5
  • 9
  • 3
    Replace your local `dictionary` variable and `Do` and `Select` with `Scan(new Dictionary(), (dict, e) => { dict.Add(e.Code, e.Update); return dict; })`. Also, if immutability is important, use an [ImmutableDictionary](http://blogs.msdn.com/b/bclteam/p/immutable.aspx) – Brandon Jan 21 '15 at 14:23