To solve this problem using exclusively Rx tools, ideally you would like to have something like this:
source
.GroupBy(item => item.Key)
.Select(group => group.Select(
item => Observable.FromAsync(() => ProcessAsync(item))).Merge(1))
.Merge(maxConcurrent: N)
.Wait();
The inner Merge(1)
would enforce the exclusive processing within each group, and the outer Merge(N)
would enforce the global maximum concurrency policy. Unfortunately this doesn't work because the outer Merge(N)
restricts the subscriptions to the inner sequences (the IGroupedObservable<T>
s), not to their individual elements. This is not what you want. The result will be that only the first N groups to be processed, and the elements of all other groups will be ignored. The GroupBy
operator creates hot subsequences, and if you don't subscribe to them immediately you'll lose elements.
In order for the outer Merge(N)
to work as desired, you'll have to merge freely all the inner sequences that are produced by the Observable.FromAsync
, and have some other mechanism to serialize the processing of each group. One idea is to implement a special Select
operator that emits an Observable.FromAsync
only after the previous one is completed. Below is such an implementation, based on the Zip
operator. The Zip
operator maintains internally two hidden buffers, so that it can produce pairs from two sequences that might emit elements with different frequences. This buffering is exactly what we need in order to avoid losing elements.
private static IObservable<IObservable<TResult>> SelectOneByOne<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, IObservable<TResult>> selector)
{
var subject = new BehaviorSubject<Unit>(default);
var synchronizedSubject = Observer.Synchronize(subject);
return source
.Zip(subject, (item, _) => item)
.Select(item => selector(item).Do(
_ => { },
_ => synchronizedSubject.OnNext(default),
() => synchronizedSubject.OnNext(default)));
}
The BehaviorSubject<T>
contains initially one element, so the first pair will be produced immediately. The second pair will not be produced before the first element has been processed. The same with the third pair and second element, etc.
You could then use this operator to solve the problem like this:
source
.GroupBy(item => item.Key)
.SelectMany(group => group.SelectOneByOne(
item => Observable.FromAsync(() => ProcessAsync(item))))
.Merge(maxConcurrent: N)
.Wait();
The above solution is presented only for the purpose of answering the question. I don't think that I would trust it in a production environment.