I have run into a problem, where I want to subscribe to an observable stream while a predicate is true and stop subscribing while the predicate is false. When the predicate at some point in the future is true again it should resubscribe to the observable stream.
Use case:
I have my observable stream as input (IObservable<IList<LogEntity>> items
) if I'm unable to insert the log entities into a database it should unsubscribe to the stream, and when the database is back up running it should automatically subscribe to the stream (Based on a property IsSubscribed
) and start inserting the data.
My own attempt:
I've already tried the following which DID NOT work:
var groups = from dataItem in items.SelectMany(o => o.GroupBy(i => i.EntityType))
where dataItem.Any()
select new {Type = dataItem.Key, List = dataItem.Select(o => o)};
groups
.TakeWhile(o => IsSubscribed)
.SubscribeOn(_scheduler)
.Repeat()
.Subscribe(o => Insert(o.Type, o.List));
Based on the property IsSubscribed
, I want to stream to subscribe and unsubscribe. When TakeWhile
is true OnCompleted
gets called, and when Subscribe
won't work afterwards. Side note: It is an cold observable stream
Question:
How can I create an observable stream where I can subscribe and unsubscribe to as many times as I want (Kinda like event handlers in C#)
Thanks for helping in advance