1

I'm writing a batching pipeline that processes X outstanding operations every Y seconds. It feels like System.Reactive would be a good fit for this, but I'm not able to get the subscriber to execute in parallel. My code looks like this:

var subject = new Subject<int>();

var concurrentCount = 0;

using var reader = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Subscribe(list => 
    {
        var c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine("Executing {0} simultaneous batches", c); // This never gets printed, because Subscribe is only ever called on a single thread.
        Interlocked.Decrement(ref concurrentCount);
    });
    
Parallel.For(0, 1_000_000, i =>
{
    subject.OnNext(i);
 });
subject.OnCompleted();

Is there an elegant way to read from this buffered Subject, in a concurrent manner?

Ceilingfish
  • 5,397
  • 4
  • 44
  • 71

2 Answers2

0

The Rx subscription code is always synchronous¹. What you need to do is to remove the processing code from the Subscribe delegate, and make it a side-effect of the observable sequence. Here is how it can be done:

Subject<int> subject = new();
int concurrentCount = 0;

Task processor = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .Select(list => Observable.Defer(() => Observable.Start(() =>
    {
        int c = Interlocked.Increment(ref concurrentCount);
        if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
        Interlocked.Decrement(ref concurrentCount);
    })))
    .Merge(maxConcurrent: 2)
    .DefaultIfEmpty() // Prevents exception in corner case (empty source)
    .ToTask(); // or RunAsync (either one starts the processor)

for (int i = 0; i < 1_000_000; i++)
{
    subject.OnNext(i);
}
subject.OnCompleted();

processor.Wait();

The Select+Observable.Defer+Observable.Start combination converts the source sequence to an IObservable<IObservable<Unit>>. It's a nested sequence, with each inner sequence representing the processing of one list. When the delegate of the Observable.Start completes, the inner sequence emits a Unit value and then completes. The wrapping Defer operator ensures that the inner sequences are "cold", so that they are not started before they are subscribed. Then follows the Merge operator, which unwraps the outer sequence to a flat IObservable<Unit> sequence. The maxConcurrent parameter configures how many of the inner sequences will be subscribed concurrently. Every time an inner sequence is subscribed by the Merge operator, the corresponding Observable.Start delegate starts running on a ThreadPool thread.

If you set the maxConcurrent too high, the ThreadPool may run out of workers (in other words it may become saturated), and the concurrency of your code will then become dependent on the ThreadPool availability. If you wish, you can increase the number of workers that the ThreadPool creates instantly on demand, by using the ThreadPool.SetMinThreads method. But if your workload is CPU-bound, and you increase the worker threads above the Environment.ProcessorCount value, then most probably your CPU will be saturated instead.

If your workload is asynchronous, you can replace the Observable.Defer+Observable.Start combo with the Observable.FromAsync operator, as shown here.

¹ An unpublished library exists, the AsyncRx.NET, that plays with the idea of asynchronous subscriptions. It is based on the new interfaces IAsyncObservable<T> and IAsyncObserver<T>.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
0

You say this:

// This never gets printed, because Subscribe is only ever called on a single thread.

It's just not true. The reason nothing gets printed is because the code in the Subscribe happens in a locked manner - only one thread at a time executes in a Subscribe so you are incrementing the value and then decrementing it almost immediately. And since it starts at zero it never has a chance to rise above 1.

Now that's just because of the Rx contract. Only one thread in subscribe at once.

We can fix that.

Try this code:

using var reader = subject
    .Buffer(TimeSpan.FromSeconds(1), 100)
    .SelectMany(list =>
        Observable
            .Start(() =>
            {
                var c = Interlocked.Increment(ref concurrentCount);
                Console.WriteLine("Starting {0} simultaneous batches", c);
            })
            .Finally(() =>
            {
                var c = Interlocked.Decrement(ref concurrentCount);
                Console.WriteLine("Ending {0} simultaneous batches", c);
            }))
    .Subscribe();

Now when I run it (with less than the 1_000_000 iterations that you set) I get output like this:

Starting 1 simultaneous batches
Starting 4 simultaneous batches
Ending 3 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Starting 3 simultaneous batches
Ending 1 simultaneous batches
Ending 2 simultaneous batches
Starting 4 simultaneous batches
Starting 5 simultaneous batches
Ending 3 simultaneous batches
Starting 2 simultaneous batches
Starting 2 simultaneous batches
Ending 2 simultaneous batches
Starting 3 simultaneous batches
Ending 0 simultaneous batches
Ending 4 simultaneous batches
Ending 1 simultaneous batches
Starting 1 simultaneous batches
Starting 1 simultaneous batches
Ending 0 simultaneous batches
Ending 0 simultaneous batches
Enigmativity
  • 113,464
  • 11
  • 89
  • 172