0

I'm trying to solve the following: a) subscriber receives events from IObservable for some time. Then it unsubscribes, do some stuff and then subscribe again. Here it should start receiving events from exactly the same point where unsubscription was performed. b) Such behavior is desirable for multiple subscribers model. E.g. when one has unsubscribed, others should continue receiving events.

Are there any suggestions from the RX side?

Thanks in advance!

borovikpe
  • 71
  • 1
  • 7
  • Your point "b)" tells me that you probably haven't quite understood Rx fully. Observables are not shared amongst multiple subscribers unless you explicitly `Publish` the observable or use subjects. (You should avoid subjects where possible.) You should consider every subscription forms a new distinct chain of observable operations back to the original source (or sources) of the observable. – Enigmativity Oct 11 '11 at 00:04

2 Answers2

1

Here's a reasonably simple Rx way to do what you want copied from my answer to this other question. I've created an extension method called Pausable that takes a source observable and a second observable of boolean that pauses or resumes the observable.

public static IObservable<T> Pausable<T>(
    this IObservable<T> source,
    IObservable<bool> pauser)
{
    return Observable.Create<T>(o =>
    {
        var paused = new SerialDisposable();
        var subscription = Observable.Publish(source, ps =>
        {
            var values = new ReplaySubject<T>();
            Func<bool, IObservable<T>> switcher = b =>
            {
                if (b)
                {
                    values.Dispose();
                    values = new ReplaySubject<T>();
                    paused.Disposable = ps.Subscribe(values);
                    return Observable.Empty<T>();
                }
                else
                {
                    return values.Concat(ps);
                }
            };

            return pauser.StartWith(false).DistinctUntilChanged()
                .Select(p => switcher(p))
                .Switch();
        }).Subscribe(o);
        return new CompositeDisposable(subscription, paused);
    });
}

It can be used like this:

var xs = Observable.Generate(
    0,
    x => x < 100,
    x => x + 1,
    x => x,
    x => TimeSpan.FromSeconds(0.1));

var bs = new Subject<bool>();

var pxs = xs.Pausable(bs);

pxs.Subscribe(x => { /* Do stuff */ });

Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Thread.Sleep(500);
bs.OnNext(true);
Thread.Sleep(5000);
bs.OnNext(false);
Community
  • 1
  • 1
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Thank you, I appreciate both approaches proposed by answers for my question. Handling IDisposable result to manage IObservable state more fits my task, so I marked the answer below as accepted. Your comment and answer give me more insight on Rx, very helpful, thanks again. – borovikpe Oct 11 '11 at 18:04
  • @Enigmativity It looks like this has a race condition. My test code: `var pxs = Observable.Interval(TimeSpan.FromMilliseconds(1)).Pausable(bs); var i = 0; pxs.Subscribe(pi => { if (i++ != pi) throw new Exception(); });` – Ilian Oct 11 '11 at 21:31
  • @IlianPinzon - I believe I fixed my race condition. Can you please check? – Enigmativity Oct 12 '11 at 01:01
  • Latest code throws an exception on the first value due to 0 != 34. I believe the race is in `paused.Disposable = ps.Subscribe(values);`. Since the stream is hot, that will miss a few items. – Ilian Oct 12 '11 at 01:11
0

It sounds like you need a "pausable" stream. Assuming that only 1 subscriber will handle the values at a time (while the other subscribers just wait), this solution is probably what you need.

Ilian
  • 5,113
  • 1
  • 32
  • 41
  • There were some issues with that post, as identified in this SO question: http://stackoverflow.com/questions/7620182/pause-and-resume-subscription-on-cold-iobservable – Anderson Imes Oct 10 '11 at 17:54