I'm trying to do a POC on the numerous independent data feeds. Sort of classical observer style application. Number of data feeds might vary from few hundred to few thousand, and numbers of observers might vary somewhat from 2 to 20000. Here's a quick example of simple data feed observable mock-up:
public class FeedMockUp
{
private readonly IScheduler observerScheduler;
private readonly Random rnd = new Random((int)DateTime.Now.Ticks);
private readonly Subject<double> sourceObservable;
private readonly IObservable<double> testedObservable;
public FeedMockUp(IScheduler observerScheduler)
{
this.observerScheduler = observerScheduler;
sourceObservable = new Subject<double>();
testedObservable =
Observable.Create<double>(x =>
{
var underlyingSourceDisposable =
sourceObservable
.Subscribe(_ => x.OnNext(rnd.NextDouble()));
return underlyingSourceDisposable;
});
}
public IDisposable SubscribeToUnderlyingFeed(int numberOfSubscribers)
{
int counter = 0;
var disposable = new CompositeDisposable();
for (int i = 0; i < numberOfSubscribers; i++)
{
disposable.Add(testedObservable
.ObserveOn(observerScheduler)
.Subscribe(_ => Interlocked.Increment(ref counter)));
}
return disposable;
}
public void PushNewFeed()
{
sourceObservable.OnNext(rnd.NextDouble());
}
}
While I was playing around with the shedulers in order to improve the throughput of observables update I have noticed that while using EventLoopScheduler
memory consumption of the application having 100 data feeds with 1000 observers was quite constant, for 1000 observers it being ~100Mb and growing linearly when adding new observers to the mix.
However when I've tried using TaskPoolScheduler, on x86 process I've started getting OutOfMemoryException
exceptions and on x64 process memory consumption exploded, or rather, became quite indeterminate ranging anywhere from 1Gb to 2Gb for just 500 observers and growing almost exponentially with new observers in the mix.
Here's the code I've been using for testing. Can you see what's wrong with it? Why such a difference in performance? Guessing, there's some internal copying/queuing involved here, but it's just my guess. Ideally I'd like to find out what's happening here without diving to RX code base...
private static void Main(string[] args)
{
const int displayItemCount = 100;
const int callbackCount = 500;
//var rtScheduler = new EventLoopScheduler();
var rtScheduler = TaskPoolScheduler.Default;
var rtFeeds = new List<FeedMockUp>();
for (int i = 0; i < displayItemCount; i++)
{
var mockFeed = new FeedMockUp(rtScheduler);
mockFeed.SubscribeToUnderlyingFeed(callbackCount);
rtFeeds.Add(mockFeed);
}
foreach (var rtFeedMockUp in rtFeeds)
{
rtFeedMockUp.PushNewFeed();
}
Console.WriteLine("Memory used for feed {0} mockups with {1} observers / callbacks. Memory {2} Mb",
displayItemCount, callbackCount, Environment.WorkingSet / (1024 * 1024));
Console.ReadKey();
}