4

I have list of operations to complete and I want to return an observable which is notified when all the observables are completed (returning status of operations will be the best):

foreach (var id in service.FetchItems().ToEnumerable().ToArray())
{
    service.Delete(id); // <- returns IObservable<Unit>
}
// something.Wait();

service.FetchItems() returns IObservable<string>, service.Delete(...) returns IObservable<Unit>

Is the following approach correct?

service.FetchItems().ForEachAsync(id => service.Delete(id)).ToObservable().Wait();
Alex Netkachov
  • 13,172
  • 6
  • 53
  • 85

2 Answers2

2

I would avoid all awaiting and tasks and just stick with plain RX for this.

Try this approach:

var query =
    from id in service.FetchItems()
    from u in service.Delete(id)
    select id;

query
    .ToArray()
    .Subscribe(ids =>
    {
        /* all fetches and deletes done now */
    });

The .ToArray() operator in Rx takes an IObservable<T> that returns zero or more T's and returns an IObservable<T[]> that returns a single array that contains zero or more T's only when the source observable completes.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • What if the observables are different types? IObservable and IObservable. An array will not work for me. – Nick Gallimore Jul 08 '20 at 15:02
  • @NickGallimore - `FetchItems` and `Delete` are observables that return different types, yet the query works fine. Are you seeing a different issue? – Enigmativity Jul 09 '20 at 03:45
0

ToEnumerable blocks waiting for the next element in the sequence. You could do:

Task delAllTask = service.FetchItems()
    .SelectMany(service.Delete)
    .ToTask();

then you can block on the task or continue asynchronously e.g.

delAllTask.Wait();
delAllTask.ContinueWith(...);
Lee
  • 142,018
  • 20
  • 234
  • 287