12

I am very excited about using Rx in production application; where I will be listening to incoming notification updates coming from different channel.

I will be writing Rx query on top this stream where I will throttling using .Window() operator. Subscriber (In my case it is ActionBlock) will process this data in blocking fashion; (i.e it will not spawn Task from ActionBlock). Keeping above in mind if data comes at much faster rate than what my Subscriber can consume then what will happen to incoming data. Does Rx query uses any buffer internally; will it get overflowed ?

user2757350
  • 311
  • 3
  • 12
  • Do you absolutely need to process every single event, or is it possible to ignore some of them (even if that takes some fancy logic)? – cwharris Dec 18 '13 at 22:20
  • Since you're already using Dataflow as a consumer, why not also use it as a producer? It supports throttling pretty well. – svick Dec 19 '13 at 08:45

3 Answers3

9

The phenomenon you're referring to is called Back Pressure, and the Rx team is currently exploring different ways to handle this situation. One solution might be communicating back-pressure back to the Observable so that it might "slow down".

To alleviate back-pressure, you could use lossy operators such as Throttle or Sample.

Timothy's answer is mostly right, but it is possible to have back-pressure occur on a single thread. This can happen if you use asynchronous code. In that sense, back-pressure is related to synchronization and scheduling, not threading (recall that by default Rx is single threaded).

If you run into a scenario where events are being produced faster than they can be consumed, and you're not using a lossy operator to alleviate the back-pressure, those items are usually being scheduled/queued/buffered, which can lead to a lot of memory allocation.

Personally, this has not been an issue for me, since usually events are processed faster than they are yielded, or loss of events is simply not an option, and therefore the extra memory consumption is inevitable.

cwharris
  • 17,835
  • 4
  • 44
  • 64
  • Thanks for reply; "those items are usually being scheduled/queued/buffered, which can lead to a lot of memory allocation" - so my earlier question was surrounding this; Where does this work gets buffered (obviously inside memory); but do we as developer have handle over this buffer; and whether we can dictate anything related to this buffering of messages as developer. Thanks – user2757350 Dec 19 '13 at 13:15
  • You have to use a lossy operator as Chris says - have a look at this approach to relieving back-pressure for more info: http://www.zerobugbuild.com/?p=192 – James World Dec 19 '13 at 19:35
  • Exactly. You have four options. 1: Slow down the source, 2: buffer the events (back-pressure), 3: speed up the consumer, 4: lose some events. For the built in operators, the only control you have over *how* events are scheduled is by injecting a specific `Scheduler`. That provides enough control for certain circumstances, but from my experience items are scheduled on a per-subscription basis, not a per-item basis. Some operators also have selector overloads, which might help in certain areas. If you need even more control than that, you may need to write custom operators. – cwharris Dec 19 '13 at 20:23
  • In my case I cannot afford to lose and incoming messages because they are positions updates... – user2757350 Dec 20 '13 at 15:36
  • 1
    Are the positions absolute? if so, you could, in theory, lose some of the updates at a lose to smoothness/accuracy. We could help more if we knew exactly what the problem was. – cwharris Dec 20 '13 at 15:38
  • Chris Positions are independent of each other; these are not tickers and i cannot afford to use any lossy operators. It seems my only solution is use buffer which will store all incoming position updates. – user2757350 Jan 08 '14 at 03:43
  • You should consider whether the rate of incoming notifications is really that large. As Chris said, this isn't an issue that comes up that often. Usually when it does it's because of a slow consumer rather than a producer that's so fast nothing could possibly keep up with it. Examples of slow consumers are anything that involves network, database, UI, even local files, etc. If you're doing any of these in your observer, you should consider buffering the updates and merging the processing if you can't use lossy operators. Profiling the observer code is worthwhile too. – Niall Connaughton Aug 25 '15 at 00:32
5

This is actually down to the implementation of individual operators, but the built-in ones will buffer on a per-subscription basis - so a slow consumer won't block other subscribers, unless of course subscribers are sharing threads.

On a related note, Rx doesn't always protect the Rx grammar; for example, it is your responsibility to ensure you don't make concurrent calls to OnNext on a Subject. You can use Observable.Synchronize() to fix this.

James World
  • 29,019
  • 9
  • 86
  • 120
  • Does it internally creates more than one instance of Subscriber or it always one ? – user2757350 Dec 19 '13 at 16:38
  • 1
    Instead of "Subscriber", say "Observer". `IObservable` one `Subscribe` method accepts an `IObserver`. The overloads that accept delegates are in fact extension methods that create an anonymous observer behind the scenes - there is always one observer per subscription. – James World Dec 19 '13 at 19:37
1

If the subscriber is processing on the same thread as the emitting observable, the data cannot come faster than the subscriber can consume.

IObservable<int> data = ...;
var subscription = data.Subscribe(n => Console.WriteLine(n));

In this example, each int that emits out of data will be written to the console before the next int is emitted.

If the subscription crosses threads, then the above doesn't hold.

Timothy Shields
  • 75,459
  • 18
  • 120
  • 173
  • So; what you are saying as Rx Query and Subscriber (i.e. Observer) always run on same thread ? If I have IObservable over FileStream; how will that behave. Because FileStream is independant of IObservable and can give data whether Rx Query sitting on top of it is executing or not... – user2757350 Dec 18 '13 at 17:06
  • 2
    What you are talking about in this comment is the difference between hot and cold observables. Take a look at this question: http://stackoverflow.com/questions/2521277/what-are-the-hot-and-cold-observables. With a hot observable, you can miss data if you aren't subscribed, but if you are subscribed you should get all the data sent by the observable, even if that means OnNexts are buffered up on a slow observer. – James World Dec 18 '13 at 17:31