0

I'm experiencing a deadlock when I use blocking code with Task.Wait(), waiting an async method that inside awaits an Rx LINQ query.

This is an example:

public void BlockingCode() 

{

    this.ExecuteAsync().Wait();

}

public async Task ExecuteAsync() 

{

    await this.service.GetFooAsync().ConfigureAwait(false);

    //This is the RX query that doesn't support ConfigureAwaitawait 
    await this.service.Receiver
      .FirstOrDefaultAsync(x => x == "foo")
      .Timeout(TimeSpan.FromSeconds(1));

}

So, my question is if there is any equivalent for ConfigureAwait on awaitable IObservable to ensure that the continuation is not resumed on the same SynchronizationContext.

David
  • 10,458
  • 1
  • 28
  • 40
  • 1
    You should NEVER EVER EVER use `Task.Wait()` and you should avoid using `Task.Result`. They will eventually cause a deadlock unless you know what you are doing...but if you did...you wouldn't use them either. – Aron May 15 '15 at 14:39
  • 2
    I know that thing, and that's why in my example I'm adding ConfigureAwait(false), which prevents any deadlock because it doesn't resume on the same SynchronizationContext than before the await call. The issue is that ConfigureAwait(false) doesn't exist for awaiting Observables... – Mauro Agnoletti May 15 '15 at 14:47
  • https://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.observeon%28v=vs.103%29.aspx – Aron May 15 '15 at 14:50
  • Yes, I also tried that with ObserveOn(NewThreadScheduler.Default). But what continues in another thread is the delegate inside the Subscribe method. I'm not sure if what follows the await block also continues there. Like: //Here I'm on a specific Sinchronization Context await observable.ObserveOn(NewThreadScheduler.Default).Subscribe(() => { //This goes for sure in another thread. }); //Here, on what Synchronization Context am I? I don't want to be on the same one before awaiting... – Mauro Agnoletti May 15 '15 at 14:54
  • Then I can't help you with `this.service.Receiver`. Depends entirely on the implementation. – Aron May 15 '15 at 14:55
  • Here are some ways to avoid deadlocks with async-await http://www.nedstoyanov.com/async-await-deadlock/ – NeddySpaghetti May 16 '15 at 11:31

2 Answers2

5

You have to comprehend what "awaiting an Observable" means. Check out this. So, semantically, your code

await this.service.Receiver
    .FirstOrDefaultAsync(x => x == "foo")
    .Timeout(TimeSpan.FromSeconds(1));

is equivalent to

await this.service.Receiver
    .FirstOrDefaultAsync(x => x == "foo")
    .Timeout(TimeSpan.FromSeconds(1))
    .LastAsync()
    .ToTask();

(Note that there is some kind of redundance here, calling FirstOrDefaultAsyncand LastAsync but that's no problem).

So there you got your task (it can take an additional CancellationToken if available). You may now use ConfigureAwait.

Daniel C. Weber
  • 1,011
  • 5
  • 12
  • 1
    Thanks Daniel. I don't understand why do I need to do .LastAsync().ToTask() instead of .ToTask() directly. – Mauro Agnoletti May 18 '15 at 18:37
  • 1
    You are right, you don't have to, since ToTask already takes the last value in the sequence. I just wanted to make clear what code is equivalent to "awaiting an observable", and put emphasis on the fact that it is always the last value being taken. – Daniel C. Weber May 19 '15 at 07:01
  • Noted that you can pass cancellation token with `ToTask()` to unsubscribe the subscription. – shtse8 Aug 04 '18 at 19:51
2

ConfigureAwait is not directly related to awaiters themselves, rather it is a functionality of the TPL to configure how Task should complete. It's problematic because this TPL method doesn't return a new Task, so you can't compose it with a conversion to an observable.

Rx itself is basically free-threaded. You can control the threads used during subscription and events with far finer control than Tasks - see here for more on this: ObserveOn and SubscribeOn - where the work is being done

It's hard to fix your code because you don't provide a small but complete working example - however, the built in functions in Rx won't ever try to marshall to a particular thread unless you specifically tell them to with one of the above operators.

If you combine an operator like Observable.FromAsync to convert the Task to an observable, you can use Observable.SubscribeOn(Scheduler.Default) to start the Task off the current SynchronizationContext.

Here's a gist to play with (designed for LINQPad, runs with nuget package rx-main): https://gist.github.com/james-world/82c3cc39babab7870f6d

Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • I think that misses the point of the question. It is not about the context in which a value is observed but rather about the context in which the continuation will be scheduled. – Daniel C. Weber May 17 '15 at 18:00
  • I don't think it does miss the point. I'm saying that FromAsync and SubscribeOn give you the chance to control the context in which the continuation is scheduled. (And in the article you linked, Paul mentions using StartAsync and Defer - which FromAsync wraps - in much the same way) – James World May 17 '15 at 18:02
  • 1
    OP is trying to avoid a deadlock due to marshalling back to the SynchronizationContext in which the task is started (ala ConfigureAwait(true)), so ensuring you *aren't* on a context when the task is kicked off solves the problem. – James World May 17 '15 at 18:08
  • By the task that is "kicked off", you refer to `ExecuteAsync`? Of course, ensuring that is is already started on some thread pool thread solves this particular problem, but that will only hide the real problem in `ExecuteAsync`, i.e. the continuation is scheduled on the original context. – Daniel C. Weber May 17 '15 at 18:19
  • ...which is why I asked for a complete working program, best to solve the problem at that point, not in the Rx. I could have been clearer about that I guess. This is very much a workaround. – James World May 17 '15 at 18:20
  • If `ExecuteAsync` is some library-code then it might still be a good idea to use `ConfigureAwait(false)` throughout it so I hope my answer will be helpful. But I agree wrt. a complete working programm since the bug is probably in `GetFooAsync()` or `sender.Receiver` doing something upon subscription that deadlocks. – Daniel C. Weber May 17 '15 at 18:28
  • Thanks Daniel and James for the help and detailed information. It was very useful. I will combine the suggestions and use SubscribeOn for the origin of the Stream, which is generated with Observable.FromAsync, and I will add the .ToTask().ConfigureAwait(false) on the consumer code just to add extra safety. – Mauro Agnoletti May 18 '15 at 21:55
  • Which is the difference in this case by using Scheduler.Default versus NewThreadScheduler.Default? I have doubts of which scheduler is used in the case of Scheduler.Default. – Mauro Agnoletti May 19 '15 at 21:41