1

I noticed that the Rx Merge operator accepts an optional maxConcurrent parameter. This can be used to limit the maximum concurrency, by subscribing concurrently to a limited number of subsequences. It works perfectly when new subsequences are pushed at a slower rate than the rate of the completion of the subscribed subsequences, but it becomes problematic when new subsequences are pushed faster than that. What happens is that the subsequences are buffered in an internal buffer with a forever increasing size, and also that the currently subscribed subsequences are becoming older and older. Here is a demonstration of this problem:

await Observable
    .Generate(0, _ => true, x => x, x => x, _ => TimeSpan.FromMilliseconds(10))
    .Select(_ => Observable
        .Return(DateTime.Now)
        .Do(d => Console.WriteLine(
            $"Then: {d:HH:mm:ss.fff}, " +
            $"Now: {DateTime.Now:HH:mm:ss.fff}, " +
            $"TotalMemory: {GC.GetTotalMemory(true):#,0} bytes"))
        .Delay(TimeSpan.FromMilliseconds(1000)))
    .Merge(maxConcurrent: 1)
    .Take(10);

A new subsequence is pushed every 10 milliseconds, and each subsequence completes after 1000 milliseconds. The subsequences are merged with maximum concurrency 1 (sequentially).

Output:

Then: 12:45:34.019, Now: 12:45:34.054, TotalMemory: 117,040 bytes
Then: 12:45:34.082, Now: 12:45:35.088, TotalMemory: 139,336 bytes
Then: 12:45:34.093, Now: 12:45:36.094, TotalMemory: 146,336 bytes
Then: 12:45:34.114, Now: 12:45:37.098, TotalMemory: 153,216 bytes
Then: 12:45:34.124, Now: 12:45:38.109, TotalMemory: 159,272 bytes
Then: 12:45:34.145, Now: 12:45:39.126, TotalMemory: 167,608 bytes
Then: 12:45:34.156, Now: 12:45:40.141, TotalMemory: 173,952 bytes
Then: 12:45:34.177, Now: 12:45:41.147, TotalMemory: 180,432 bytes
Then: 12:45:34.188, Now: 12:45:42.164, TotalMemory: 186,808 bytes
Then: 12:45:34.209, Now: 12:45:43.175, TotalMemory: 197,208 bytes

(Try it on Fiddle)

The memory usage grows steadily, and the time gap between the creation and subscription of each subsequence grows as well.

What I would like to have is a custom Merge variant that has an internal buffer with limited size. When the buffer is full, any incoming subsequence should cause the currently oldest buffered subsequence to be dropped. Here is a marble diagram of the desirable behavior, configured with maximum concurrency = 1 and buffer capacity = 1:

Source: +----A------B------C------|
A:           +-------a----a---|
B:                  not-subscribed
C:                            +-----c----|
Result: +------------a----a---------c----|
  • The subsequence A was subscribed immediately after it was emitted.
  • Then the B was emitted and was stored in the buffer because the A had not completed yet.
  • Then the C was emitted and replaced the B in the buffer. As a result the B subsequence was dropped and was never subscribed.
  • The completion of the subsequence A was followed by the immediate subscription of the buffered subsequence C.
  • The final result contains the merged values emitted by the A and C subsequences.

How could I implement a custom Rx operator with this specific behavior? Here is the stub of the operator I am trying to implement:

public static IObservable<T> MergeBounded<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency,
    int boundedCapacity)
{
    return source.Merge(maximumConcurrency);
    // TODO: enforce the boundedCapacity policy somehow
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104

2 Answers2

1

I came up with a functional solution, I'm not sure it's the way to go, just because of complexity. But I think I covered all the bases.

First, if you take a functional approach, this is a relatively simple state-machine problem: The state needs to know how many observables are currently executing and the buffer queue. The two events that can affect the state are a new Observable entering the buffer queue (causes an enqueue on the buffer queue), or a currently-executing observable terminating (causes a dequeue on the buffer queue).

Since state-machine basically means Scan, and Scan can only work with one type, we'll have to coerce our two events into one type, which I called Message below. The state machine then knows all and can do the work of the Merge(n) overload.

The last trick is the loop-back: Since the completing Observable is 'downstream' from Scan, we need to 'loop-back' the termination of that observable into Scan. For that, I always refer back to the Drain function in [this answer][1].

public static class X
{
    public static IObservable<T> MergeBounded<T>(
        this IObservable<IObservable<T>> source,
        int maximumConcurrency,
        int boundedCapacity)
    {
        return Observable.Defer(() =>
        {
            var capacityQueue = new Subject<Unit>();

            var toReturn = source.Publish(_source => _source
                .Select(o => Message.Enqueue(o))
                .Merge(capacityQueue.Select(_ => Message.Dequeue(Observable.Empty<T>())))
                .Scan((bufferCount: 0, buffer: ImmutableQueue<IObservable<T>>.Empty, executionCount: 0, item: (IObservable<T>)null), (state, message) =>
                {
                    var buffer = state.buffer;
                    var bufferCount = state.bufferCount;
                    var executionCount = state.executionCount;
                    if (message.IsEnqueue)
                    {
                        if (executionCount < maximumConcurrency)
                            return (0, ImmutableQueue<IObservable<T>>.Empty, executionCount + 1, message.Object);

                        buffer = buffer.Enqueue(message.Object);
                        if (bufferCount == boundedCapacity)
                            buffer = buffer.Dequeue();
                        else
                            bufferCount++;
                        return (bufferCount, buffer, executionCount, null);
                    }
                    else
                    {
                        if (bufferCount == 0)
                            return (0, buffer, executionCount - 1, null);
                        else
                            return (bufferCount - 1, buffer.Dequeue(), executionCount, buffer.Peek());
                    }
                })
                .Where(t => t.item != null)
                .Select(t => t.item)
                .Select(o => o.Do(_ => { }, () => capacityQueue.OnNext(Unit.Default)))
                .TakeUntil(_source.IgnoreElements().Materialize())
                .Merge()
            );

            return toReturn;
        });

    }

    public class Message
    {
        public static Message<T> Enqueue<T>(T t)
        {
            return Message<T>.Enqueue(t);
        }

        public static Message<T> Dequeue<T>(T t)
        {
            return Message<T>.Dequeue(t);
        }

    }

    public class Message<T>
    {
        private readonly T _t;
        private readonly bool _isEnqueue;
        private Message(bool isEnqueue, T t)
        {
            _t = t;
            _isEnqueue = isEnqueue;
        }
        
        public static Message<T> Enqueue(T t)
        {
            return new Message<T>(true, t);
        }

        public static Message<T> Dequeue(T t)
        {
            return new Message<T>(false, t);
        }
        
        public bool IsEnqueue => _isEnqueue;
        public T Object => _t;
    }
}

I wrote some test-code (based on original question) to verify, if you want to piggy back off of that. Test now passing:

//              T: 0123456789012345678901234567890123
//            T10: 0         1         2         3
//         Source: +----A------B------C------|
//              A:      +-------a----a---|
//              B:             +----------b----b---|
//              C:                    +--------c----|
// ExpectedResult: +------------a----a---------c----|


var ts = new TestScheduler();

var A = ts.CreateHotObservable(
    ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnCompleted<string>(22 * TimeSpan.TicksPerSecond)
);
var B = ts.CreateHotObservable(
    ReactiveTest.OnNext(23 * TimeSpan.TicksPerSecond, "b"),
    ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "b"),
    ReactiveTest.OnCompleted<string>(32 * TimeSpan.TicksPerSecond)
);
var C = ts.CreateHotObservable(
    ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
    ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
var source = ts.CreateHotObservable(
    ReactiveTest.OnNext(5 * TimeSpan.TicksPerSecond, A.AsObservable()),
    ReactiveTest.OnNext(12 * TimeSpan.TicksPerSecond, B.AsObservable()),
    ReactiveTest.OnNext(19 * TimeSpan.TicksPerSecond, C.AsObservable()),
    ReactiveTest.OnCompleted<IObservable<string>>(26 * TimeSpan.TicksPerSecond)
);
var observer = ts.CreateObserver<string>();
var testResult = source.MergeBounded(1, 1);
testResult.Subscribe(observer);

var expected = ts.CreateHotObservable(
    ReactiveTest.OnNext(13 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnNext(18 * TimeSpan.TicksPerSecond, "a"),
    ReactiveTest.OnNext(28 * TimeSpan.TicksPerSecond, "c"),
    ReactiveTest.OnCompleted<string>(33 * TimeSpan.TicksPerSecond)
);
ts.Start();
//observer.Messages.Dump("Actual");   // Linqpad
//expected.Messages.Dump("Expected"); // Linqpad
ReactiveAssert.AreElementsEqual(expected.Messages, observer.Messages);

(Test code passes without exception)

Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • Edited: I fixed the termination problem. – Shlomo Dec 31 '20 at 14:22
  • All subsequences that are run are subscribed twice because of the terminator. Subsequences that aren’t are run once. – Shlomo Jan 01 '21 at 10:50
  • Edited: Terminator now only subscribing to published sequence, not subsequence. – Shlomo Jan 01 '21 at 11:02
  • Yeah! Now it works perfectly. Thanks Shlomo! I customized the implementation with a couple of modifications: (1) I replaced the `Message` class with a `ValueTuple>`, to make the implementation more compact, and (2) replaced the `ImmutableQueue` with a `Queue`, that has a `Count` property, and makes redundant to include the `bufferCount` in the `Scan`'s state. I could post my mod as a separate answer if anyone is interested. – Theodor Zoulias Jan 01 '21 at 14:29
  • I would be careful with using mutable collections. If not properly set up, they do funny things with multiple subscriptions. But otherwise makes sense. – Shlomo Jan 01 '21 at 20:15
  • This `Queue` is under complete control by the `Scan` operator. It seems fairly safe IMHO, in this particular case at least. Btw the link to the [question](https://stackoverflow.com/questions/4123178/a-way-to-push-buffered-events-in-even-intervals/) that contains the `Drain` operator is broken after the latest edit. – Theodor Zoulias Jan 01 '21 at 20:26
  • Btw somewhere I read that the `Drain` operator was removed from the library, because its functionality was duplicated by the `Concat` operator. – Theodor Zoulias Jan 02 '21 at 20:31
0

Here is another implementation. It is not as feature-complete as Shlomo's solution, because it can't be configured with boundedCapacity: 0. The internal buffer must have at least a size of 1.

/// <summary>
/// Merges elements from all inner observable sequences into a single observable
/// sequence, limiting the number of concurrent subscriptions to inner sequences.
/// The unsubscribed inner sequences are stored in a buffer with the specified
/// maximum capacity. When the buffer is full, the oldest inner sequence in the
/// buffer is dropped and ignored in order to make room for the latest inner
/// sequence.
/// </summary>
public static IObservable<T> MergeBounded<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency, int boundedCapacity)
{
    if (boundedCapacity < 1)
        throw new ArgumentOutOfRangeException(nameof(boundedCapacity));

    return Observable.Defer(() =>
    {
        var queue = new Queue<IObservable<T>>(boundedCapacity);
        return source
            .Select(inner =>
            {
                bool oldestDropped = false;
                lock (queue)
                {
                    if (queue.Count == boundedCapacity)
                    {
                        queue.Dequeue(); oldestDropped = true;
                    }
                    queue.Enqueue(inner);
                }
                if (oldestDropped) return null;
                return Observable.Defer(() =>
                {
                    lock (queue) return queue.Dequeue();
                });
            })
            .Where(inner => inner != null)
            .Merge(maximumConcurrency);
    });
}

This implementation is based on the assumption that the built-in Merge operator never subscribes twice to the same subsequence. Otherwise the statement queue.Dequeue() could be invoked on an empty queue, and cause an exception.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104