31

I would like to call back an async function within an Rx subscription.

E.g. like that:

public class Consumer
{
    private readonly Service _service = new Service();

    public ReplaySubject<string> Results = new ReplaySubject<string>();

    public void Trigger()
    {
        Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
    }

    public Task RunAsync()
    {
        return _service.DoAsync();
    }
}

public class Service
{
    public async Task<string> DoAsync()
    {
        return await Task.Run(() => Do());
    }

    private static string Do()
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(200));
        throw new ArgumentException("invalid!");
        return "foobar";
    }
}

[Test]
public async Task Test()
{
    var sut = new Consumer();
    sut.Trigger();
    var result = await sut.Results.FirstAsync();
}

What needs to be done, in order to catch the exception properly?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Martin Komischke
  • 1,440
  • 1
  • 13
  • 24
  • 1
    I just figured that I can put async at the first place. But unfortunately this doesn't solve the problem I am faced with. I will post a more explicit example. – Martin Komischke Apr 11 '14 at 08:38
  • possible duplicate of [Is there a way to subscribe an observer as async](http://stackoverflow.com/questions/18814805/is-there-a-way-to-subscribe-an-observer-as-async) – Zache Apr 11 '14 at 08:48

4 Answers4

43

Ana Betts' answer works in most scenarios, but if you want to block the stream while waiting for the async function to finish you need something like this:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(l => Observable.FromAsync(asyncMethod))
          .Concat()
          .Subscribe();

Or:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
          .Concat()
          .Subscribe();
TeaDrivenDev
  • 6,591
  • 33
  • 50
reijerh
  • 779
  • 1
  • 7
  • 16
  • 2
    It's another use case but very interesting though. – Martin Komischke Mar 04 '16 at 11:08
  • You must be very careful with this approach. If your async handler throws an exception, the subscription will be terminated, and your handle will stop receiving events. Basically, if you need asynchronous processing of observable events, you have to queue them and consume by a separate background task. There is no way around it - I learned that hard way. – C-F Nov 04 '22 at 02:43
21

Change this to:

Observable.Timer(TimeSpan.FromMilliseconds(100))
    .SelectMany(async _ => await RunAsync())
    .Subscribe();

Subscribe doesn't keep the async operation inside the Observable.

Ana Betts
  • 73,868
  • 16
  • 141
  • 209
  • 2
    Thanks Paul, for your suggestion. Very interesting. Do you have something where I can read a little bit further about that? – Martin Komischke May 05 '14 at 07:25
  • 1
    Note for others using this solution, if you need keep the order just replace `SelectMany` with `Select` and then `.Concat` (similar to reijerh's answer). – bokibeg Oct 28 '19 at 19:53
  • 2
    This will produce an observable sequence of unfinished tasks. What do you do with them next? Your "subscriber" does nothing. All the tasks hang unobserved, no control over exceptions/termination. This is not a solution. – C-F Nov 04 '22 at 02:25
19

You don't want to pass an async method to Subscribe, because that will create an async void method. Do your best to avoid async void.

In your case, I think what you want is to call the async method for each element of the sequence and then cache all the results. In that case, use SelectMany to call the async method for each element, and Replay to cache (plus a Connect to get the ball rolling):

public class Consumer
{
    private readonly Service _service = new Service();

    public IObservable<string> Trigger()
    {
        var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
            .SelectMany(_ => RunAsync())
            .Replay();
        connectible.Connect();
        return connectible;
    }

    public Task<string> RunAsync()
    {
        return _service.DoAsync();
    }
}

I changed the Results property to be returned from the Trigger method instead, which I think makes more sense, so the test now looks like:

[Test]
public async Task Test()
{
    var sut = new Consumer();
    var results = sut.Trigger();
    var result = await results.FirstAsync();
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 1
    That is an awesome solution. Thanks Stephen. – Martin Komischke Apr 11 '14 at 16:16
  • What if you want to catch exceptions thrown from RunAsync? What I got is: obs.SelectMany(x => Observable.FromAsync(() => RunAsync()).Catch(Observable.Empty())) Is there a better way? – Victor Grigoriu Apr 01 '15 at 14:59
  • @VictorGrigoriu: I recommend you ask your own question and tag it with `system.reactive`. – Stephen Cleary Apr 01 '15 at 19:09
  • 1
    This solution does not preserve the ordering of results (just in case a reader is trying to achieve that). To preserve ordering look at the answers below. – bboyle1234 Aug 23 '19 at 09:32
  • 1
    And here's @Victor Grigoriu's follow-up question (with an answer): https://stackoverflow.com/q/29427680/68955 – Reyhn Jun 01 '23 at 21:03
1

Building on reijerh's answer, I created an extension method.

public static IDisposable SubscribeAsync<TResult>(this IObservable<TResult> source, Func<Task> action) =>
            source.Select(_ => Observable.FromAsync(action))
                .Concat()
                .Subscribe();

If I understand this correctly, it should block until the async task finishes. But it will allow you to call SubscribeAsync and pass in your task. In my opinion this makes it a little more readable.

WhenSomethingHappened
    .SubscribeAsync(async () => { 
        await DoFirstAsyncThing(); 
        await DoSecondAsyncThing(); 
    });

Hackmodford
  • 3,901
  • 4
  • 35
  • 78