I have a collection of observables that generate state changes for a so-called Channel
. And I have a ChannelSet
that should monitor those channels.
I would like to write something like this: if one channel is operational, the channel set is up, else, the channel set is down.
IEnumerable<ChannelState> channelStates = ...;
if (channelStates.Any(cs => cs == ChannelState.Operational))
channelSet.ChannelSetState = ChannelSetState.Up;
else
channelSet.ChannelSetState = ChannelSetState.Down;
But where do I get my IEnumerable<ChannelState>
? If I have 1 channel, I can simply subscribe to its state changes and modify the state of the channel set accordingly. For two channels, I could use CombineLatest
:
Observable.CombineLatest(channel0States, channel1States, (cs0, cs1) =>
{
if (cs0 == ChannelSetState.Up || cs1 == ChannelSetState.Up)
return ChannelSetState.Up;
else
return ChannelSetState.Down;
});
But I have an IEnumerable<Channel>
and a corresponding IEnumerable<IObservable<ChannelState>>
. I'm looking for something like CombineLatest
that is not limited to a fixed number of observables.
To complicate matters, the collection of channels can be added to and removed from. So once in a while, a channel will be added for example. The new channel also generates state changes that need to be incorporated.
So what I'm actually looking for is a function:
IEnumerable<IObservable<ChannelState>> --> IObservable<ChannelSetState>
that keeps up-to-date when the input changes. There should be some way to accomplish this using Rx but I can't really figure out how.