4

I'm building a straightforward processing pipeline where an item is fetched as an input, it is being operated by multiple processors in a sequential manner and finally it is output. Image below describes the overall architecture:

rx-pipe

The way it is currently working: Pipeline is fetching items from the provider as quickly as it can. As soon as an item is fetched, it is passed to the processors. Once an item is processed, the output is notified. While an individual item is processed in a sequential manner, multiple items may be processed in parallel (depending on how fast they are fetched from the provider).

The IObservable created and returned from the pipeline looks like this:

return Observable.Create<T>(async observer =>
{
    while (_provider.HasNext)
    {
        T item = await _provider.GetNextAsync();
        observer.OnNext(item);
    }                
}).SelectMany(item => Observable.FromAsync(() =>
    _processors.Aggregate(
        seed: Task.FromResult(item),
        func: (current, processor) => current.ContinueWith( // Append continuations.
            previous => processor.ProcessAsync(previous.Result))
            .Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}};

The missing part: I need a control mechanism which controls how many items (max) can be in the pipeline at any given time.

For example, if max parallel processings is 3, then that would result in the following workflow:

  1. Item 1 is fetched and passed to the processors.
  2. Item 2 is fetched and passed to the processors.
  3. Item 3 is fetched and passed to the processors.
  4. Item 1 completed processing.
  5. Item 4 is fetched and passed to the processors.
  6. Item 3 completed processing.
  7. Item 5 is fetched and passed to the processors.
  8. Etc...
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
Jaanus Varus
  • 3,508
  • 3
  • 31
  • 49

2 Answers2

3

Merge provides an overload which takes a max concurrency.

Its signature looks like: IObservable<T> Merge<T>(this IObservable<IObservable<T>> source, int maxConcurrency);

Here is what it would look like with your example (I refactored some of the other code as well, which you can take or leave):

return Observable
//Reactive while loop also takes care of the onComplete for you
.While(() => _provider.HasNext, 
       Observable.FromAsync(_provider.GetNextAsync))
//Makes return items that will only execute after subscription
.Select(item => Observable.Defer(() => {
  return _processers.Aggregate(
    seed: Observable.Return(item),
    func: (current, processor) => current.SelectMany(processor.ProcessAsync)); 
  }))
 //Only allow 3 streams to be execute in parallel.
.Merge(3);

To break down what this does,

  1. While will check each iteration, if _provider.HasNext is true, if so then it will resubscribe to get the next value for _provider, otherwise it emits onCompleted
  2. Inside of select a new observable stream is created, but not yet evaluated by using Defer
  3. The returned IObservable<IObservable<T>> is passed to Merge which subscribes to a max of 3 observables simultaneously.
  4. The inner observable finally evaluates when it is subscribed to.

Alternative 1

If you also need to control the number of parallel requests you need to get a little trickier, since you will need to signal that your Observable is ready for new values:

return Observable.Create<T>(observer => 
{
  var subject = new Subject<Unit>();
  var disposable = new CompositeDisposable(subject);

  disposable.Add(subject
    //This will complete when provider has run out of values
    .TakeWhile(_ => _provider.HasNext)
    .SelectMany(
      _ => _provider.GetNextAsync(),
     (_, item) => 
     {
       return _processors
        .Aggregate(
         seed: Observable.Return(item),
         func: (current, processor) => current.SelectMany(processor.ProcessAsync))
        //Could also use `Finally` here, this signals the chain
        //to start on the next item.
        .Do(dontCare => {}, () => subject.OnNext(Unit.Default));
     }
    )
    .Merge(3)
    .Subscribe(observer));

  //Queue up 3 requests for the initial kickoff
  disposable.Add(Observable.Repeat(Unit.Default, 3).Subscribe(subject.OnNext));

  return disposable;
});
paulpdaniels
  • 18,395
  • 2
  • 51
  • 55
  • Thanks for contribution! While `Merge` controls how many items will be processed in parallel, the items are still provided as quickly as possible (this also needs to be limited). It seems that using `Merge` doesn't affect the code run inside `.While(() => _provider.HasNext, Observable.FromAsync(_provider.GetNextAsync))`. – Jaanus Varus Jun 21 '15 at 08:44
  • @Discosultan Ahh, I had assumed the `GetNextAsync` was some sort of long running method that naturally limited the speed, but you are correct the `While` is not tied to the `Merge`. It will still only allow a single `GetNextAsync` to be in flight at a time, so the speed will be linked to how fast that method ends up returning. Should it be querying based on some uniform time constant? – paulpdaniels Jun 21 '15 at 17:42
  • You are correct that the speed is linked to how fast the method ends up returning and that is fine. What I need to limit is how many are allowed to be returned and be in the pipeline at any given time. So if for example the limit is 3, that would mean that 3 items can be fetched sequentially and 4th one can be fetched *only after one of the items is finished processing/outputs the pipeline*. – Jaanus Varus Jun 21 '15 at 18:09
  • @Discosultan I updated my answer with an example that will handle that requirement. You essentially need to use a subject to signal the chain when it can start processing the next value. – paulpdaniels Jun 21 '15 at 21:41
  • Great idea to use `Subject` like that - working as expected :). One thing I noticed is that the initial kickoff `Observable.Repeat(Unit.Default, 3).Subscribe(subject)` did not work because the subject always completed and signalling the chain in `Do` or `Finally` had no effect. For initial kickoff, I simply call `subject.OnNext` as many times as required and this seems to do the trick. Anyway, thanks for the effort! – Jaanus Varus Jun 22 '15 at 07:12
  • 1
    Ah, yes that is correct, i should have made it `.Subscribe(subject.OnNext)` which would stop the complete method from firing. Either way glad it's working! – paulpdaniels Jun 22 '15 at 07:27
2

You might need to rearrange the code you posted but this would be one way to do it:

var eventLoopScheduler = new EventLoopScheduler ();
(from semaphore in Observable.Return(new Semaphore(2,2))
 from input in GetInputObs()
 from getAccess in Observable.Start(() => semaphore.WaitOne(),eventLoopScheduler)
 from output in ProcessInputOnPipeline(input)
        .SubscribeOn(Scheduler.Default) 
        .Finally(() => semaphore.Release())
 select output)
 .Subscribe(x => Console.WriteLine(x), ex => {});

I've modelled your pipeline as 1 Observable (which in reality would be composed of several smaller observables chained together)

Key thing is to make sure that the semaphore gets released no matter how the pipeline terminates (Empty/Error) otherwise the stream might hang, and so a Finally() is used call Release() on the semaphore. (Might be worth considering adding a Timeout as well on the pipeline observable if it is liable to never OnComplete()/OnError().

Edit:

As per below comments, I've added some scheduling around the semaphore access so that we don't block whoever is pushing these inputs into our stream. I've used an EventLoopScheduler so that all requests for semaphore access will queue up and be executed on 1 thread.

Edit: I do prefer Paul's answer though - simple, less scheduling, less synchronisation (merge uses a queue internally).

Chris
  • 471
  • 3
  • 8
  • Using a semaphore works great! Two questions: 1) What is the purpose of `.SubscribeOn(Scheduler.Default)`? 2) I wonder if there is any workaround to having to block a thread, wait for semaphore to be signaled? I might be overthinking it :) – Jaanus Varus Jun 20 '15 at 21:57
  • The `from semaphore in Observable.Return(new Semaphore(2,2)` is a very smart move. Well done for isolating state within the observable. – Enigmativity Jun 21 '15 at 04:05
  • @Discosultan - 1) Purpose of the SubcribeOn is to schedule your pipeline on the ThreadPool, which means your stream can continue processing other inputs. 2) Fair Point. Will amend solution with an edit. – Chris Jun 21 '15 at 08:17
  • Thanks for your contribution! While your solution worked great, I resorted with @paulpdaniels's one, since he managed to avoid having to manually use threading constructs such as `Semaphore`. – Jaanus Varus Jun 22 '15 at 07:18