1

Playing with System.Reactive trying to resolve the next task -

  • Break an incoming stream of strings into groups
  • Items in each group must be processed asynchronously and sequentially
  • Groups must be processed in parallel
  • No more than N groups must be processed at the same time
  • Ideally, w/o using sync primitives

Here is the best I've figured out so far -

TaskFactory taskFactory = new (new LimitedConcurrencyLevelTaskScheduler(2));
TaskPoolScheduler scheduler = new (taskFactory);
source
    .GroupBy(item => item)
    .SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).ObserveOn(scheduler).Concat())
    .Subscribe();

Any idea how to achieve it w/o a scheduler? Couldn't make it work via Merge()

Kiryl
  • 1,416
  • 9
  • 21
  • Somewhat related question: [Dynamically processing a concurrent collection in parallel by group but serially within each group](https://stackoverflow.com/questions/71000722/dynamically-processing-a-concurrent-collection-in-parallel-by-group-but-serially). Same problem, but not using Rx to solve it. – Theodor Zoulias May 03 '22 at 15:51
  • 1
    Could you clarify this requirement? *"No more than N groups must be processed at the same time"*. Assuming for example that N=1, does this mean that the last element of the group A must be processed before processing the first element of the group B? Or it means that at any given moment only one element, either from the group A or from the group B, can be processed? In other words does this limitation apply to the grouped subsequences, or to their individual elements? – Theodor Zoulias May 03 '22 at 18:06
  • 1
    @TheodorZoulias "Or it means that at any given moment only one element, either from the group A or from the group B, can be processed" - this one. Also with a constraint that elements from the same group are processed sequentially(N=2, group A has 5 elements, no group B - processing takes care of A.0 then A.1 then A.2 ...) – Kiryl May 03 '22 at 18:37

2 Answers2

2

The easiest way to enforce the "No more than N groups must be processed at the same time" limitation, is probably to use a SemaphoreSlim. So instead of this:

.SelectMany(g => g.Select(item => Observable.FromAsync(() => onNextAsync(item))).Concat())

...you can do this:

var semaphore = new SemaphoreSlim(N, N);

//...

.SelectMany(g => g.Select(item => Observable.FromAsync(async () =>
{
    await semaphore.WaitAsync();
    try { return await onNextAsync(item); }
    finally { semaphore.Release(); }
})).Merge(1))

Btw in the current Rx version (5.0.0) I don't trust the Concat operator, and I prefer to use the Merge(1) instead.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thanks! Although I'm still wondering if there is a way to solve it "purely" via Rx rather than bringing sync primitives into the equation – Kiryl May 03 '22 at 18:58
  • 1
    @Kiryl it might be possible. If it is, it's likely to be much more complicated. You could look at these two questions for ideas: [1](https://stackoverflow.com/questions/65477010/how-to-merge-a-nested-observable-iobservableiobservablet-with-limited-concur), [2](https://stackoverflow.com/questions/64841312/how-to-merge-multiple-observables-with-order-preservation-and-maximum-concurrenc) – Theodor Zoulias May 03 '22 at 19:32
1

To solve this problem using exclusively Rx tools, ideally you would like to have something like this:

source
    .GroupBy(item => item.Key)
    .Select(group => group.Select(
        item => Observable.FromAsync(() => ProcessAsync(item))).Merge(1))
    .Merge(maxConcurrent: N)
    .Wait();

The inner Merge(1) would enforce the exclusive processing within each group, and the outer Merge(N) would enforce the global maximum concurrency policy. Unfortunately this doesn't work because the outer Merge(N) restricts the subscriptions to the inner sequences (the IGroupedObservable<T>s), not to their individual elements. This is not what you want. The result will be that only the first N groups to be processed, and the elements of all other groups will be ignored. The GroupBy operator creates hot subsequences, and if you don't subscribe to them immediately you'll lose elements.

In order for the outer Merge(N) to work as desired, you'll have to merge freely all the inner sequences that are produced by the Observable.FromAsync, and have some other mechanism to serialize the processing of each group. One idea is to implement a special Select operator that emits an Observable.FromAsync only after the previous one is completed. Below is such an implementation, based on the Zip operator. The Zip operator maintains internally two hidden buffers, so that it can produce pairs from two sequences that might emit elements with different frequences. This buffering is exactly what we need in order to avoid losing elements.

private static IObservable<IObservable<TResult>> SelectOneByOne<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IObservable<TResult>> selector)
{
    var subject = new BehaviorSubject<Unit>(default);
    var synchronizedSubject = Observer.Synchronize(subject);
    return source
        .Zip(subject, (item, _) => item)
        .Select(item => selector(item).Do(
            _ => { },
            _ => synchronizedSubject.OnNext(default),
            () => synchronizedSubject.OnNext(default)));
}

The BehaviorSubject<T> contains initially one element, so the first pair will be produced immediately. The second pair will not be produced before the first element has been processed. The same with the third pair and second element, etc.

You could then use this operator to solve the problem like this:

source
    .GroupBy(item => item.Key)
    .SelectMany(group => group.SelectOneByOne(
        item => Observable.FromAsync(() => ProcessAsync(item))))
    .Merge(maxConcurrent: N)
    .Wait();

The above solution is presented only for the purpose of answering the question. I don't think that I would trust it in a production environment.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Nice! Thanks! Although it wasn't working for me as is until I did tried next - source.GroupBy(item => item.Key).SelectMany(group => group.SelectOneByOne(item => Observable.FromAsync(() => onNext(item)))) .Merge(maxConcurrent: 2).Subscribe(). So before I mark the answer, was your code working for you? – Kiryl May 06 '22 at 19:31
  • 1
    @Kiryl yep, you are right. I fixed the code. :-) – Theodor Zoulias May 06 '22 at 20:01