6

I have a collection of observables that generate state changes for a so-called Channel. And I have a ChannelSet that should monitor those channels.

I would like to write something like this: if one channel is operational, the channel set is up, else, the channel set is down.

IEnumerable<ChannelState> channelStates = ...;
if (channelStates.Any(cs => cs == ChannelState.Operational))
    channelSet.ChannelSetState = ChannelSetState.Up;
else
    channelSet.ChannelSetState = ChannelSetState.Down;

But where do I get my IEnumerable<ChannelState>? If I have 1 channel, I can simply subscribe to its state changes and modify the state of the channel set accordingly. For two channels, I could use CombineLatest:

Observable.CombineLatest(channel0States, channel1States, (cs0, cs1) =>
    {
        if (cs0 == ChannelSetState.Up || cs1 == ChannelSetState.Up)
            return ChannelSetState.Up;
        else
            return ChannelSetState.Down;
    });

But I have an IEnumerable<Channel> and a corresponding IEnumerable<IObservable<ChannelState>>. I'm looking for something like CombineLatest that is not limited to a fixed number of observables.

To complicate matters, the collection of channels can be added to and removed from. So once in a while, a channel will be added for example. The new channel also generates state changes that need to be incorporated.

So what I'm actually looking for is a function:

IEnumerable<IObservable<ChannelState>> --> IObservable<ChannelSetState>

that keeps up-to-date when the input changes. There should be some way to accomplish this using Rx but I can't really figure out how.

Ronald Wildenberg
  • 31,634
  • 14
  • 90
  • 133
  • Can you add details on how new Channels are created? – Scott Weinstein Jul 25 '11 at 12:43
  • 1
    Seems like in this case you need a combination of CombineLatest some sort of Accumulator. I have this problem all of the time with things like Validation (where all objects need to be valid at the same time for the composite validation to be valid). This is a great question. – Anderson Imes Jul 25 '11 at 19:22
  • @Scott What do you want to know about channel creation? They are rather complex objects that wrap a TCP connection. – Ronald Wildenberg Jul 25 '11 at 21:24
  • @Anderson Thanks. I have come up with a solution that I'll post tomorrow. Not entirely satisfied because I think a more elegant solution should be possible. – Ronald Wildenberg Jul 25 '11 at 21:26
  • Don't need to know the insides of the Channel ctors, but what the use case is for when/how they are created. Basically, can we start w/ an `Observable` and solve from that point? – Scott Weinstein Jul 25 '11 at 22:54
  • I see what you mean. When starting with an `IObservable`, channels can be added dynamically. However, I'm not sure how removing should work then. Going to check out Enigmativity's answer now to see if he has a solution for that. Thanks. – Ronald Wildenberg Jul 26 '11 at 05:56
  • @Anderson I think Enigmativity's answer matches your problem. If you have observables that generate binary states (e.g. valid/invalid) it's a very nice solution. – Ronald Wildenberg Jul 26 '11 at 08:20
  • @Ronald Wildenberg: I'll give it a try. Something about ignoring the enumerable of channelstate sources makes me think this isn't appropriate. If any one of the sources is `false` at any given time, the whole thing should be `false`... making each source indistinct using Merge seems like it wouldn't work, but I'll reserve judgement until after I test. – Anderson Imes Jul 26 '11 at 13:33

3 Answers3

3

There's a fairly straight forward way to do what you want with Rx, but you need to think in terms of observables only and not mix in enumerables.

The function signature that you really need to think in terms of is:

IObservable<IObservable<ChannelState>> --> IObservable<ChannelSetState>

Here's the function:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();

It is important that each IObservable<ChannelState> in the IObservable<IObservable<ChannelState>> behaves properly for this to work.

I've assumed that the ChannelState enum has an Idle state and that each IObservable<ChannelState> will produce zero or more pairs of Operational/Idle values (Operational followed by Idle) before completing.

Also you said "the collection of channels can be added to and removed from" - thinking in terms of IEnumerable<IObservable<ChannelState>> this sounds reasonable - but in Rx you don't have to worry about removes because each observable can signal its own completion. Once it signals completion then it is as if it has been removed from the collection because it can not produce any further values. So you only need to worry about adding to the collection - this is easy using subjects.

So now the function can be used like so:

var channelStatesSubject = new Subject<IObservable<ChannelState>>();
var channelStates = channelStatesSubject.AsObservable();
var channelSetStates = f(channelStates);

channelSetStates.Subscribe(css => { /* ChannelSetState subscription code */ });

channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
channelStatesSubject.OnNext(/* IObservable<ChannelState> */);
// etc

I ran this using some test code, that used three random ChannelState observables, with a Do call in the f function for debugging, and got the following sequence:

1
Up
2
3
2
1
2
1
0
Down
1
Up
0
Down

I think that's what you're after. Let me know if I've missed anything.


As per the comments below, the ChannelState enum has multiple states, but only Operational means that the connection is up. So it's very easy to add a DistinctUntilChanged operator to hide multiple "down" states. Here's the code now:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .DistinctUntilChanged()
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();

Added code to ensure that the first select query always starts with a 1. Here's the code now:

Func<IObservable<IObservable<ChannelState>>, IObservable<ChannelSetState>> f =
    channelStates =>
        channelStates
            .Merge()
            .Select(cs => cs == ChannelState.Operational ? 1 : -1)
            .StartWith(1)
            .DistinctUntilChanged()
            .Scan(0, (cssn, csn) => cssn + csn)
            .Select(cssn => cssn > 0 ? ChannelSetState.Up : ChannelSetState.Down)
            .DistinctUntilChanged();
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Interesting solution. It seems like everything I need is there. I hadn't realized that completing an IObservable effectively means removing it from the 'list'. This is a lot better than the solution I came up with. – Ronald Wildenberg Jul 26 '11 at 06:01
  • @Ronald - Completing on a `IObservable` within a merged `IObservable>` effectively means it's removed from the list. Completing on a `IObservable` by itself just means it won't produce any more values. – Enigmativity Jul 26 '11 at 06:09
  • Hm, I still have one problem. My `IObservable` is actually generated from a call to `Observable.FromEventPattern`. How do you complete such an observable? – Ronald Wildenberg Jul 26 '11 at 07:06
  • And another problem: I actually have eight different channel states. So there is no sequence of (`Idle`/`Operational`) pairs in the event stream, which means that the `Scan` function scans for the wrong things. Not sure if that can be fixed in your solution. Because the event streams are merged, you lose some essential information: the channel that generated the event. – Ronald Wildenberg Jul 26 '11 at 07:48
  • Not sure if this will work. Channels can cycle through a number of states, one of which is `Operational`. – Ronald Wildenberg Jul 26 '11 at 08:17
  • @Ronald - The key thing here is that the channel goes into an operational state and then out of it before it can go back in. Of the eight states can you identify which are in and which are out? If so, my solution can still work. – Enigmativity Jul 26 '11 at 13:18
  • @Ronald - The `FromEventPattern` should work fine. You can use the `TakeWhile` extension method to introduce a flag to stop the observable. – Enigmativity Jul 26 '11 at 13:36
  • There is one state signaling an operational channel: `ChannelState.Operational`. All the other states (`Creating`, `Created`, `SettingUp`, ...) are an indication that the channel should not be used. – Ronald Wildenberg Jul 26 '11 at 13:58
  • @Ronald - If `Operational` means `Up` and all others mean `Down` then my code still works. – Enigmativity Jul 27 '11 at 00:50
  • If I cycle through `Creating` -> `Created` -> `SettingUp` -> `Operational` for one channel, the stream contains `-1,-1,-1,1`. The `Scan` operation than produces `-1,-2,-3,-2`. The final value, `-2`, doesn't lead to `ChannelSetState.Up`. Or maybe I have made a mistake somewhere? – Ronald Wildenberg Jul 27 '11 at 05:18
  • Ah, I hadn't seen the update to your answer. Again a very elegant solution, thanks. I'll check if it now works as I want. – Ronald Wildenberg Jul 27 '11 at 06:45
  • Still a problem :) If the first state is a down state, then you may never end up with a `cssn > 0`. This can be easily fixed though in the `Scan` function (never drop below `0`). – Ronald Wildenberg Jul 27 '11 at 07:11
  • @Ronald - check out the query now. It solves the starting with -` issue. – Enigmativity Jul 27 '11 at 08:31
  • @mtyson - What's a "Kant"? – Enigmativity Oct 09 '16 at 03:51
  • Immanuel Kant, German philosopher with a reputation for extremely intelligent but dense writing: https://en.wikipedia.org/wiki/Immanuel_Kant – mtyson Oct 09 '16 at 15:28
1

Perhaps start with an IObservable<Channel> rather than starting w/ IEnumerable<Channel>. A way to do this would be to use a Subject<Channel>, and when a new one is created, OnNext() it.

If you need a list,

xsChannels.Subscribe(item => { lock(list) { list.add(item); } });

Scott Weinstein
  • 18,890
  • 14
  • 78
  • 115
1

I promised to add the solution I came up with myself, so here it is. As long as I haven't found anything better I'll use this, although I still think there has to be a better way :)

I use a class that uses a ConcurrentDictionary to keep the latest value from each registered observable. When an observable is unregistered, its latest value is removed again, as well as the subscription associated with it.

When any registered observable generates a value, all latest values are collected and sent to a Subject.

public class DynamicCombineLatest<T>
{
    private readonly IDictionary<IObservable<T>, T> _latestValues =
        new ConcurrentDictionary<IObservable<T>, T>();
    private readonly IDictionary<IObservable<T>, IDisposable> _subscriptions =
        new ConcurrentDictionary<IObservable<T>, IDisposable>();
    private readonly ISubject<IEnumerable<T>> _result =
        new Subject<IEnumerable<T>>();

    public void AddObservable(IObservable<T> observable)
    {
        var subscription =
            observable.Subscribe(t =>
                                 {
                                     _latestValues[observable] = t;
                                     _result.OnNext(_latestValues.Values);
                                 });
        _subscriptions[observable] = subscription;
    }

    public void RemoveObservable(IObservable<T> observable)
    {
        _subscriptions[observable].Dispose();
        _latestValues.Remove(observable);
        _subscriptions.Remove(observable);
    }

    public IObservable<IEnumerable<T>> Result
    {
        get { return _result; }
    }
}
Ronald Wildenberg
  • 31,634
  • 14
  • 90
  • 133
  • Try to avoid `Add` and `Remove` functions in the Rx world. Have your `Add` function return the `IDisposable` that will perform the remove. Then you don't need to have a `Remove` function and you can lose the `_subscriptions` dictionary. – Enigmativity Jul 26 '11 at 13:42