1

I have run into a problem, where I want to subscribe to an observable stream while a predicate is true and stop subscribing while the predicate is false. When the predicate at some point in the future is true again it should resubscribe to the observable stream.

Use case:

I have my observable stream as input (IObservable<IList<LogEntity>> items) if I'm unable to insert the log entities into a database it should unsubscribe to the stream, and when the database is back up running it should automatically subscribe to the stream (Based on a property IsSubscribed) and start inserting the data.

My own attempt:

I've already tried the following which DID NOT work:

var groups = from dataItem in items.SelectMany(o => o.GroupBy(i => i.EntityType))
    where dataItem.Any()
    select new {Type = dataItem.Key, List = dataItem.Select(o => o)};

groups
    .TakeWhile(o => IsSubscribed)
    .SubscribeOn(_scheduler)
    .Repeat()
    .Subscribe(o => Insert(o.Type, o.List));

Based on the property IsSubscribed, I want to stream to subscribe and unsubscribe. When TakeWhile is true OnCompleted gets called, and when Subscribe won't work afterwards. Side note: It is an cold observable stream

Question:

How can I create an observable stream where I can subscribe and unsubscribe to as many times as I want (Kinda like event handlers in C#)

Thanks for helping in advance

leppie
  • 115,091
  • 17
  • 196
  • 297
SOK
  • 515
  • 6
  • 21
  • Replace `TakeWhile` with `Where`, and remove the `Repeat`. – Aron Feb 03 '16 at 18:16
  • If I do so, will it not just discard the log entities when the `Where` clause is false? – SOK Feb 03 '16 at 18:36
  • That IS the requirements I got from you... That is the same as if you were to unsubscribe to the Observable. – Aron Feb 03 '16 at 18:38
  • If I unsubscribe from a cold observable stream the whole stream will stop on unsubscribe. I want to be able to resubscribe to the stream again in the future. – SOK Feb 03 '16 at 18:42
  • But if you subscribe to a cold Observable you will receive all events from the start of the stream... – Aron Feb 03 '16 at 18:43

2 Answers2

2

Looks like a duplicate question.

However, pulling the code from Pause and Resume Subscription on cold IObservable, it could be adjusted to be

var subscription = Observable.Create<IObservable<YourType>>(o =>
{
    var current = groups.Replay();
    var connection = new SerialDisposable();
    connection.Disposable = current.Connect();

    return IsSubscribed
        .DistinctUntilChanged()
        .Select(isRunning =>
        {
            if (isRunning)
            {
                //Return the current replayed values.
                return current;
            }
            else
            {
                //Disconnect and replace current.
                current = source.Replay();
                connection.Disposable = current.Connect();
                //yield silence until the next time we resume.
                return Observable.Never<YourType>();
            }

        })
        .Subscribe(o);
})
.Switch()
.Subscribe(o => Insert(o.Type, o.List));

You can see Matt Barrett (and I) talk about it here. I recommend watching the whole video (maybe on 2x speed) to get the full context.

Community
  • 1
  • 1
Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • Seems overly complicated...Really you just want to have a Monad that pauses execution, temporarily. `.Do()` will do that, (given the right scheduler (most of them)). – Aron Feb 04 '16 at 06:16
  • You are actually suggesting to just block a thread in the middle of an Rx query?! Seems completely against the spirit of Rx? – Lee Campbell Feb 04 '16 at 08:01
  • Actually I was proposing to use the duality of Task and Observables using the `System.Reactive.Threading.Tasks` namespace. Not blocking a thread, but using asynchrous waits from async/await. – Aron Feb 04 '16 at 08:08
  • Then you mean `SelectMany` which will loose the serialization constraints the Rx has, but Task does not. – Lee Campbell Feb 05 '16 at 02:05
1

What you want is to add groups .Delay(group.SelectMany(WaitForDatabaseUp))

public async Task WaitForDatabaseUp()
{
    //If IsSubscribed continue execution
    if(IsSubscribed) return;
    //Else wait until IsSubscribed == true
    await this.ObservableForProperty(x => x.IsSubscribed, skipInitial: false)
                       .Value()
                       .Where(isSubscribed => isSubscribed)
                       .Take(1);
}

Use your favourite framework to convert the INPC into an Observable where you see ObserveProperty()

Basically we put inline a Task that only returns when IsSubscribed == true. then turn that Task into an Observable, for compatability with Rx.

Aron
  • 15,464
  • 3
  • 31
  • 64
  • Downvoter care to explain the downvote... – Aron Feb 04 '16 at 06:13
  • As I read it (and as my quick test assures me) this code does nothing. The `Do` calls a method that returns a `Task`. The `Do` doesn't evaluate the task, so it is basically a no-op. Maybe it is supposed to be a filter or some sort of thread blocking thing. Maybe supporting tests or "a working sample" would help? – Lee Campbell Feb 04 '16 at 08:00
  • @LeeCampbell My bad, seems there isn't a `Observable.Do(Func)` that I thought there was... – Aron Feb 04 '16 at 08:09
  • @LeeCampbell A fiddle of the code is availble here https://dotnetfiddle.net/pV6eol Unfortunately due to a bug in Fiddle/Nuget, it doesn't compile on their service. But if you copy it into VS it should work fine. And no...we aren't blocking any threads at all. – Aron Feb 04 '16 at 08:42
  • Yes this makes more sense (`SelectMany` instead of `Do`). Leveraging the TaskPool is a nice trick. Perhaps a rename, makes this a useful operator. – Lee Campbell Feb 05 '16 at 01:35
  • Swapped to an upvote – Lee Campbell Feb 05 '16 at 01:36
  • Having just run this through the test from the duplicate question, this "kinda" works, but due to the `SelectMany` it looses the serialization of values. In my tests (from duplicate question), it fails. – Lee Campbell Feb 05 '16 at 02:04