How about something like this:
public static IObservable<TResult> SelectAsync<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> projectAsync)
{
return Observable.Create<TResult>(
observer =>
{
var throttle = new BehaviorSubject<TResult>(default);
var observable = source
.Zip(throttle, (value, _) => value)
.SelectMany(value => Observable.Defer(() => Observable.StartAsync(() => projectAsync(value))))
.Publish();
return new CompositeDisposable(
observable.Subscribe(throttle),
observable.Subscribe(observer),
observable.Connect(),
throttle
);
}
);
}
In this extension method the Zip
combined with the BehaviorSubject
form a throttle within which items are queued until projectAsync
is complete.
It can then be used as follows:
public static async Task<Unit> DoSomethingAsync(int value)
{
Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Started processing value '{value}'");
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Completed processing value '{value}'");
return Unit.Default;
}
public static async Task RunAsync()
{
IObservable<int> source = Observable.Generate(0, value => value < 25, value => value + 1, value => value, value => TimeSpan.FromSeconds(0.1));
await source
.Do(value => Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Received value '{value}'"))
.SelectAsync(value => DoSomethingAsync(value))
.ToTask();
}
Wherein, a source observable is generated which emits 25 items at 100ms intervals. The DoSomethingAsync
method uses Task.Delay
to simulate a 1 second processing delay. Running this code should result in the following output:
[02:07:56] Received value '0'
[02:07:56] Started processing value '0'
[02:07:56] Received value '1'
[02:07:56] Received value '2'
[02:07:57] Received value '3'
[02:07:57] Received value '4'
[02:07:57] Received value '5'
[02:07:57] Received value '6'
[02:07:57] Received value '7'
[02:07:57] Received value '8'
[02:07:57] Received value '9'
[02:07:57] Completed processing value '0'
[02:07:57] Started processing value '1'
[02:07:57] Received value '10'
[02:07:57] Received value '11'
[02:07:58] Received value '12'
[02:07:58] Received value '13'
[02:07:58] Received value '14'
[02:07:58] Received value '15'
[02:07:58] Received value '16'
[02:07:58] Received value '17'
[02:07:58] Received value '18'
[02:07:58] Completed processing value '1'
[02:07:58] Started processing value '2'
[02:07:58] Received value '19'
[02:07:58] Received value '20'
[02:07:59] Received value '21'
[02:07:59] Received value '22'
[02:07:59] Received value '23'
[02:07:59] Received value '24'
[02:07:59] Completed processing value '2'
[02:07:59] Started processing value '3'
[02:08:00] Completed processing value '3'
[02:08:00] Started processing value '4'
[02:08:01] Completed processing value '4'
...
[02:08:20] Started processing value '23'
[02:08:21] Completed processing value '23'
[02:08:21] Started processing value '24'
[02:08:22] Completed processing value '24'
You should be aware that this code does not provide any means of back-pressure to the source so, should the source continually emit items faster than projectAsync
, memory pressure will build (via queuing within the Zip
operator) until you receive an out of memory exception.
Furthermore, while I don't know the use case for this requirement, you might want to consider whether "System.Interactive.Async" or "System.Threading.Tasks.DataFlow" might be a better fit here.