I noticed that the Rx Merge
operator accepts an optional maxConcurrent
parameter. This can be used to limit the maximum concurrency, by subscribing concurrently to a limited number of subsequences. It works perfectly when new subsequences are pushed at a slower rate than the rate of the completion of the subscribed subsequences, but it becomes problematic when new subsequences are pushed faster than that. What happens is that the subsequences are buffered in an internal buffer with a forever increasing size, and also that the currently subscribed subsequences are becoming older and older. Here is a demonstration of this problem:
await Observable
.Generate(0, _ => true, x => x, x => x, _ => TimeSpan.FromMilliseconds(10))
.Select(_ => Observable
.Return(DateTime.Now)
.Do(d => Console.WriteLine(
$"Then: {d:HH:mm:ss.fff}, " +
$"Now: {DateTime.Now:HH:mm:ss.fff}, " +
$"TotalMemory: {GC.GetTotalMemory(true):#,0} bytes"))
.Delay(TimeSpan.FromMilliseconds(1000)))
.Merge(maxConcurrent: 1)
.Take(10);
A new subsequence is pushed every 10 milliseconds, and each subsequence completes after 1000 milliseconds. The subsequences are merged with maximum concurrency 1 (sequentially).
Output:
Then: 12:45:34.019, Now: 12:45:34.054, TotalMemory: 117,040 bytes
Then: 12:45:34.082, Now: 12:45:35.088, TotalMemory: 139,336 bytes
Then: 12:45:34.093, Now: 12:45:36.094, TotalMemory: 146,336 bytes
Then: 12:45:34.114, Now: 12:45:37.098, TotalMemory: 153,216 bytes
Then: 12:45:34.124, Now: 12:45:38.109, TotalMemory: 159,272 bytes
Then: 12:45:34.145, Now: 12:45:39.126, TotalMemory: 167,608 bytes
Then: 12:45:34.156, Now: 12:45:40.141, TotalMemory: 173,952 bytes
Then: 12:45:34.177, Now: 12:45:41.147, TotalMemory: 180,432 bytes
Then: 12:45:34.188, Now: 12:45:42.164, TotalMemory: 186,808 bytes
Then: 12:45:34.209, Now: 12:45:43.175, TotalMemory: 197,208 bytes
The memory usage grows steadily, and the time gap between the creation and subscription of each subsequence grows as well.
What I would like to have is a custom Merge
variant that has an internal buffer with limited size. When the buffer is full, any incoming subsequence should cause the currently oldest buffered subsequence to be dropped. Here is a marble diagram of the desirable behavior, configured with maximum concurrency = 1 and buffer capacity = 1:
Source: +----A------B------C------|
A: +-------a----a---|
B: not-subscribed
C: +-----c----|
Result: +------------a----a---------c----|
- The subsequence A was subscribed immediately after it was emitted.
- Then the B was emitted and was stored in the buffer because the A had not completed yet.
- Then the C was emitted and replaced the B in the buffer. As a result the B subsequence was dropped and was never subscribed.
- The completion of the subsequence A was followed by the immediate subscription of the buffered subsequence C.
- The final result contains the merged values emitted by the A and C subsequences.
How could I implement a custom Rx operator with this specific behavior? Here is the stub of the operator I am trying to implement:
public static IObservable<T> MergeBounded<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency,
int boundedCapacity)
{
return source.Merge(maximumConcurrency);
// TODO: enforce the boundedCapacity policy somehow
}