17

I have an asnyc function that I want to invoke on every observation in an IObservable sequence, limiting delivery to one event at a time. The consumer expects no more than one message in flight; and this is also the RX contract, if I understand it correctly.

Consider this sample:

static void Main() {
  var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
  //var d = ob.Subscribe(async x => await Consume(x));  // Does not rate-limit.
  var d = ob.Subscribe(x => Consume(x).Wait());
  Thread.Sleep(10000);
  d.Dispose();
}

static async Task<Unit> Consume(long count) {
  Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
  await Task.Delay(750);
  Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
  return Unit.Default;
}

The Consume function fakes a 750 ms processing time, and ob produces events every 100 ms. The code above works, but calls task.Wait() on a random thread. If I instead subscribe as in the commented out line 3, then Consume is invoked at the same rate at which ob produces events (and I cannot even grok what overload of Subscribe I am using in this commented statement, so it is probably nonsense).

So how do I correctly deliver one event at a time from an observable sequence to an async function?

1 Answers1

30

Subscribers are not supposed to be long running, and therefore there isn't support for executing long running async methods in the Subscribe handlers.

Instead, consider your async method to be a single value observable sequence that takes a value from another sequence. Now you can compose sequences, which is what Rx was designed to do.

Now that you have made that leap, you will probably have something like what @Reijher creates in Howto call back async function from rx subscribe?.

The break down of his code is as follows.

//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
      //Project the event you receive, into the result of the async method
      .Select(l => Observable.FromAsync(() => asyncMethod(l)))
      //Ensure that the results are serialized
      .Concat()
      //do what you will here with the results of the async method calls
      .Subscribe();

In this scenario, you are creating implicit queues. In any problem where the producer is faster than the consumer, a queue will need to be used to collect values while waiting. Personally I prefer to make this explicit by putting data into a queue. Alternatively you could explicitly use a Scheduler to signal that is the threading model that should be picking up the slack.

This seems to be a popular hurdle (executing async in a subscribe handler) for Rx newcomers. There are many reasons that the guidance is to not put them in your subscriber, for example: 1. you break the error model 2. you are mixing async models (rx here, task there) 3. subscribe is the consumer of a composition of async sequences. An async method is just a single value sequence, so by that view cant be the end of the sequence, it's result might be though.

UPDATE

To illustrate the comment about breaking the error model here is an update of the OP sample.

void Main()
{
    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
    var d = ob.Subscribe(
        x => ConsumeThrows(x).Wait(),
        ex=> Console.WriteLine("I will not get hit"));

    Thread.Sleep(10000);
    d.Dispose();
}

static async Task<Unit> ConsumeThrows(long count)
{
    return await Task.FromException<Unit>(new Exception("some failure"));
    //this will have the same effect of bringing down the application.
    //throw new Exception("some failure");
}

Here we can see that if the OnNext handler was to throw, then we are not protected by our Rx OnError handler. The exception would be unhandled and most likely bring down the application.

Community
  • 1
  • 1
Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • Thank you, your explanation how this composition works is very good! In my case, I have not a lot of choice, as the consumer is a given (gRPC streams that are declared `async`). I do not expect it to be the bottleneck under normal network conditions though, and if it is, I am likely having more problems than could be handled with a reasonable recovery strategy--I cannot even prefer at this point implicit queueing vs e. g. dropping data. I'll consider explicit queues, but as it stands, this solution is certainly okay for my purposes. – kkm inactive - support strike May 10 '16 at 07:59
  • Lee, could you please clarify what do you mean by "breaking the error model"? When I am throwing an exception from Consume, it terminates the sequence with `OnError`. On the first sight, errors seem to do what they should. Am I missing anything? – kkm inactive - support strike May 11 '16 at 00:15
  • If in your example in the OP, you add either error statement `throw new Exception("some failure");` or `return await Task.FromException(new Exception("some failure"));` you will bring down the application. You will not hit your `OnError` handler. However in the sample code where the async method is moved out of the Subscribe into a `Select`, the sequence is terminated with and `OnError` that can be handled gracefully. – Lee Campbell May 11 '16 at 02:23
  • I think I see--you are listing the problems in my original attempt, not shortcomings of your example, correct? I knew all along my non-solution was total crap and nonsense, because of the Wait on a borrowed thread. – kkm inactive - support strike May 11 '16 at 02:27
  • 1
    Correct. I have updated the answer to hopefully remove that confusion. – Lee Campbell May 11 '16 at 02:29
  • 1
    Thanks. By the way, the argument to `FromAsync` should be `() => asyncMethod(l)`, not just `asyncMethod(l)`. Could you please fix this? – kkm inactive - support strike May 11 '16 at 02:31
  • If you only want to execute the async action only once and publish this value to all subscribers, add .Publish() to the end of the observable chain. This produces a connectable observable which you will have to call .Connect() to start the observable sequence for all subcribers. If you do not use Publish, this code will execute the async task each time for each subscriber, which can lead to a lot of executions. – nh43de Dec 19 '18 at 17:44
  • If you publish that internal Async call, you now have another resource to manage. When will you disconnect the published sequence? – Lee Campbell Jan 02 '19 at 00:41
  • You must be very careful with this approach. If your async handler throws an exception, the subscription will be terminated, and your handle will stop receiving events. Basically, if you need asynchronous processing of observable events, you have to queue them and consume by a separate background task. There is no way around it - I learned that hard way. – C-F Nov 04 '22 at 02:47