3

So I have the following RX change, but it seems to block on the select as if to preserve order. My understanding is that it should just keep delegating to the task pool?

var observable = Observable.Interval(TimeSpan.FromMilliseconds(10));

observable.ObserveOn(Scheduler.TaskPool)
    .Select(
    i =>
    {
        Console.WriteLine("Here" + System.Threading.Thread.CurrentThread.ManagedThreadId);
        System.Threading.Thread.Sleep(5000);
        return i;
    })
    .ObserveOn(Scheduler.TaskPool)
    .SubscribeOn(Scheduler.TaskPool)
    .Subscribe(i => { Console.WriteLine(i); });
Cheetah
  • 13,785
  • 31
  • 106
  • 190

1 Answers1

6

Ensuring events are delivered to a subscriber serially is a core part of the Rx grammar and fundamental to it's correct operation. It is enforced in most of the Rx operators and you should not violate this.

The mechanics of ObserveOn and SubscribeOn are addressed quite fully here.

The purpose of ObserveOn is to either avoid blocking the thread of the observable that is dispatching events and/or to control the thread on which subscribers receive events (in your case using the task pool to deliver them).

What it does not do is allow a subscriber to receive events concurrently - which as I said, would be in violation of the rules of Rx.

You might find this question on a similar topic worth reading too.

Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • `Ensuring events delivered to a subscriber serially is a core part of the Rx grammar` - wow massive fail on my understanding - is there ever a instance that this is not the case? – Cheetah Jul 08 '14 at 18:31
  • Yes, usually when someone tried to implement `IObservable` themselves... :) But there's no correct way to do it. Note that concurrent delivery of a single event to multiple subscribers is routine. – James World Jul 08 '14 at 18:39
  • Sorry to be a pain, could you expand on this `Note that concurrent delivery of a single event to multiple subscribers is routine.`...are you talking about hot observables? – Cheetah Jul 08 '14 at 18:53
  • Sorry again to pile on questions - but is there any circumstance where Observables/RX acts like a queue for multiple subscribers? – Cheetah Jul 08 '14 at 19:04
  • 2
    @Cheetah - use the various `Subjects` or `Publish` or `Multicast` to create observables which can deliver to multiple subscribers. `Publish().RefCount().ObserveOn()` is a good way to concurrently deliver to multiple subscribers. Also, in your OP if you had used `SelectMany` instead of `Select` and returned `Observable.Return(i).Delay(TimeSpan.FromSeconds(5))` instead of `Thread.Sleep` you would have effectively removed the block and allows your sleeps to all run concurrently (and potentially be delivered out of order if you had something more complex that a constant delay). – Brandon Jul 08 '14 at 22:22