The Rx subscription code is always synchronous¹. What you need to do is to remove the processing code from the Subscribe
delegate, and make it a side-effect of the observable sequence. Here is how it can be done:
Subject<int> subject = new();
int concurrentCount = 0;
Task processor = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Select(list => Observable.Defer(() => Observable.Start(() =>
{
int c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
Interlocked.Decrement(ref concurrentCount);
})))
.Merge(maxConcurrent: 2)
.DefaultIfEmpty() // Prevents exception in corner case (empty source)
.ToTask(); // or RunAsync (either one starts the processor)
for (int i = 0; i < 1_000_000; i++)
{
subject.OnNext(i);
}
subject.OnCompleted();
processor.Wait();
The Select
+Observable.Defer
+Observable.Start
combination converts the source sequence to an IObservable<IObservable<Unit>>
. It's a nested sequence, with each inner sequence representing the processing of one list
. When the delegate of the Observable.Start
completes, the inner sequence emits a Unit
value and then completes. The wrapping Defer
operator ensures that the inner sequences are "cold", so that they are not started before they are subscribed. Then follows the Merge
operator, which unwraps the outer sequence to a flat IObservable<Unit>
sequence. The maxConcurrent
parameter configures how many of the inner sequences will be subscribed concurrently. Every time an inner sequence is subscribed by the Merge
operator, the corresponding Observable.Start
delegate starts running on a ThreadPool
thread.
If you set the maxConcurrent
too high, the ThreadPool
may run out of workers (in other words it may become saturated), and
the concurrency of your code will then become dependent on the ThreadPool
availability. If you wish, you can increase the number of workers that the ThreadPool
creates instantly on demand, by using the ThreadPool.SetMinThreads
method. But if your workload is CPU-bound, and you increase the worker threads above the Environment.ProcessorCount
value, then most probably your CPU will be saturated instead.
If your workload is asynchronous, you can replace the Observable.Defer
+Observable.Start
combo with the Observable.FromAsync
operator, as shown here.
¹ An unpublished library exists, the AsyncRx.NET, that plays with the idea of asynchronous subscriptions. It is based on the new interfaces IAsyncObservable<T>
and IAsyncObserver<T>
.