I have an asnyc
function that I want to invoke on every observation in an IObservable
sequence, limiting delivery to one event at a time. The consumer expects no more than one message in flight; and this is also the RX contract, if I understand it correctly.
Consider this sample:
static void Main() {
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
//var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit.
var d = ob.Subscribe(x => Consume(x).Wait());
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> Consume(long count) {
Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(750);
Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
return Unit.Default;
}
The Consume
function fakes a 750 ms processing time, and ob
produces events every 100 ms. The code above works, but calls task.Wait()
on a random thread. If I instead subscribe as in the commented out line 3, then Consume
is invoked at the same rate at which ob
produces events (and I cannot even grok what overload of Subscribe
I am using in this commented statement, so it is probably nonsense).
So how do I correctly deliver one event at a time from an observable sequence to an async
function?