If I understand you correctly, you want to project a stream of MarketDepthLevel
updates in a list of the latest bid/ask quantities for each level (in finance parlance, this is a type of ladder). The ladder is held as a ReactiveList
bound the UI. (ObserveOn
may be required, although ReactiveList
handles this in most cases I believe)
Here's an example ladder snapped from http://ratesetter.com, where the "Price" is expressed as a percentage (Rate), and the bid/ask sizes by the amount lenders want and borrowers need at each price level:

At this point, I begin to get slightly lost. I'm confused as to why you need any further Rx operators, since you could simply Subscribe
to the update stream as is and have the handler update a fixed list data-bound to a UI. Doesn't each new event simply need to be added to the ReactiveList
if it's a new price, or replace an existing entry with a matching price? Imperative code to do this is fine if it's the last step in the chain to the UI.
There is only value in doing this in the Rx stream itself if you need to convert the MarketDepthLevelStream
into a stream of ladders. That could be useful, but you don't mention this need.
Such a need could be driven by either the desire to multicast the stream to many subscribers, and/or because you have further transformations or projections you need to make on the ladders.
Bear in mind, if the ladder is large, then working with whole ladder updates in the UI might give you performance issues - in many cases, individual updates into a mutable structure like a ReactiveList
are a practical way to go.
If working with a stream of ladders is a requirement, then look to Observable.Scan
. This is the work-horse operator in Rx that maintains local state. It is used for any form of running aggregate - such as a running total, average etc., or in this case, a ladder.
Now if all you need is the static list described above, I am potentially off on a massive detour here, but it's a useful discussion so...
You'll want to think carefully about the type used for the ladder aggregate - you need to be concious of how down-stream events would be consumed. It's likely to need an immutable collection type so that things don't get weird for subscribers (each event should be in effect a static snapshot of the ladder). With immutable collections, it may be important to think about memory efficiency.
Here's a simple example of how a ladder stream might work, using an immutable collection from nuget pre-release package System.Collections.Immutable
:
public static class ObservableExtensions
{
public static IObservable<ImmutableSortedDictionary<decimal, MarketDepthLevel>>
ToLadder(this IObservable<MarketDepthLevel> source)
{
return source.Scan(ImmutableSortedDictionary<decimal, MarketDepthLevel>.Empty,
(lastLadder, depthLevel) =>
lastLadder.SetItem(depthLevel.Price, depthLevel));
}
}
The ToLadder
extension method creates an empty immutable ladder as the seed aggregate, and each successive MarketDepthLevel
event produces a new update ladder. You may want to see if ImmutableSortedSet
is sufficient.
You would probably want to wrap/project this into your own type, but hopefully you get the idea.
Ultimately, this still leaves you with the challenge of updating a UI - and as mentioned before now you are stuck with the whole ladder, meaning you have to bind a whole ladder every time, or convert it back to a stream of individual updates - and it's getting way to off topic to tackle that here...!