4

I'm working with an unmanaged library that mandates that all calls to its API is run on the same thread. We want to use the Reactive extensions's EventLoopScheduler to facilitate that since we'll be using Observable for other things.

I'm using a method similar to the Run method in the code sample below to execute code in the scheduler which will always run on the same thread. When I'm working with managed code this works as expected and all calls are run on the thread managed by the event loop and before / after the async call is the main thread.

But, when I call a P/Invoke (the one in the code sample is just an example, I'm not really calling this one in my code but the behavior is the same), the thread does run on the event loop thread, but so does everything after!

I've tried adding ConfigureAwait(true) (and false) but it doesn't change anything. I'm really confused by this behavior, why would calling a P/Invoke change the thread continuing after the await !!?

Here's the code to reproduce:

[DllImport("user32.dll", CharSet = CharSet.Unicode, SetLastError = true)]
private static extern int MessageBox(IntPtr hWnd, string lpText, string lpCaption, uint uType);

public static Task Run(Action action, IScheduler scheduler)
{
    return Observable.Start(action, scheduler).SingleAsync().ToTask();
}

public static string ThreadInfo() =>
    $"\"{Thread.CurrentThread.Name}\" ({Thread.CurrentThread.ManagedThreadId})";

private static async Task Main(string[] args)
{
    var scheduler = new EventLoopScheduler();

    Console.WriteLine($"Before managed call on thread {ThreadInfo()}");

    await Run(() => Console.WriteLine($"Managed call on thread {ThreadInfo()}"), scheduler);

    Console.WriteLine($"After managed call on thread {ThreadInfo()}");

    Console.WriteLine($"Before PInvoke on thread {ThreadInfo()}");

    await Run(() => MessageBox(IntPtr.Zero, $"Running on thread {ThreadInfo()}", "Attention", 0), scheduler);

    Console.WriteLine($"After PInvoke on thread {ThreadInfo()}");
}

The execution returns something like this:

Before managed call on thread "" (1)
Managed call on thread "Event Loop 1" (6)
After managed call on thread "" (1)
Before PInvoke on thread "" (1)
Message box displayed with text: Running on thread "Event Loop 1" (6)
After PInvoke on thread "Event Loop 1" (6)

Where I expected

Before managed call on thread "" (1)
Managed call on thread "Event Loop 1" (6)
After managed call on thread "" (1)
Before PInvoke on thread "" (1)
Message box displayed with text: Running on thread "Event Loop 1" (6)
After PInvoke on thread "" (1)
Gimly
  • 5,975
  • 3
  • 40
  • 75
  • Any reason why you don't use the System.Threading namespace (with a custom task scheduler for example))? https://stackoverflow.com/questions/30719366/run-work-on-specific-thread – Simon Mourier May 11 '20 at 17:04
  • 1
    @SimonMourier yes, first it's quite more work than just creating the event loop :). Then, I actually tried that at first but I couldn't figure out a good solution to make it work with RX Observables which I use to poll some of those API calls, this solution fits better with the rest of what I'm doing. And, more importantly to me, even if I do end up doing things differently, I'd like to understand **why** this doesn't work. It does something very counter intuitive and breaks my understanding of async/await in C# – Gimly May 11 '20 at 19:19
  • A console app is special because it has no SynchronizationContext set: https://devblogs.microsoft.com/pfxteam/await-synchronizationcontext-and-console-apps/ . However, I've done some test with an EventLoopScheduler specific one and it still doesn't work. https://pastebin.com/raw/Kh2Yw9pn Smells bad. It works fine with "regular" Tasks/TaskScheduler. – Simon Mourier May 11 '20 at 21:57
  • Interesting. Is this netfx or netcore? I haven't done anything with EventLoopScheduler, but I assume the idea is that it installs a custom synchronization context that queues work on the initial thread? – Voo May 11 '20 at 22:39
  • I'll have to play around with Rx tomorrow if I find time, but my first guess is that the EventLoopScheduler doesn't set up a synchronization context itself, but is simply used inside the observable. When you do `await ` the current synchronization context is used, which is the default one for the console app and has nothing to do with the EventLoopScheduler, which means you get the default "pick thread from thread pool". There's some optimisations in there which can easily cause you to go back to the same thread for synchronous code. – Voo May 11 '20 at 22:58
  • @Voo it's .Net Core 3.1 in my case, I'd have to try out with netfx. As far as my understanding of the EventLoopScheduler goes, it's supposed to create a thread that will be used exclusively for the work scheduled on it. As you can see with my sample, the work passed inside the `Run` method is indeed run inside the thread created by the `EventLoopScheduler`, what is wrong is that the continuation after the await is also on that thread, which doesn't make sense in a async/await context. – Gimly May 12 '20 at 05:36
  • @Gimly Why wouldn't it make sense? await with the default synchronization context doesn't guarantee on which thread you continue your work. It just tries to avoid rescheduling on a different thread if not necessary since that would cause a performance hit. If you want to make sure you end up on the same thread from before the call to await you have to install a synchronization context on that level. Why its behavior changes with what's called is interesting but I don't think any spec is broken here. – Voo May 12 '20 at 05:51

1 Answers1

3

Tasks

A Task or a promise is a just an abstraction for callbacks. And async/await is just syntactic sugar for tasks.

Since it's a callback abstraction, await doesn't block the thread. Why does it look like it's blocking? That's because await rewrites your code into a state-machine which advances through its states when the Task being awaited completes.

It roughly gets rewritten like this:

switch (state)
{
    case 0:
        Console.WriteLine($"Before managed call on thread {ThreadInfo()}");
        Await(Run(() => Console.WriteLine($"Managed call on thread {ThreadInfo()}"), scheduler));
        return;
    case 1:

        Console.WriteLine($"After managed call on thread {ThreadInfo()}");
        Console.WriteLine($"Before PInvoke on thread {ThreadInfo()}");
        Await(Run(() => MessageBox(IntPtr.Zero, $"Running on thread {ThreadInfo()}", "Attention", 0), scheduler));
        return;
    case 2:
        Console.WriteLine($"After PInvoke on thread {ThreadInfo()}");
        return;
}

The actual rewrite uses goto rather than a switch, but the concept is the same. So when a task completes, it call this state-machine with state += 1 - in the same threading context. You only see task pool threads when you use a task scheduler.

Leaks in the abstraction

The explanation for why you see this particular behavior:

After managed call on thread "" (1)

is quite complicated. It has to do with whether a scheduled thunk completes immediately or not. If you add a Thread.Sleep in the first managed call, you'll notice that the continuation runs on the event loop thread.

This is due to scheduling optimizations preferring to only queue if something is currently running. When you call ToTask(), you're using the default scheduler, which is the current thread scheduler.

The current thread scheduler works like this:

Free? Run immediately.

Busy? Queue the work.

The run immediately behavior is why you see the log running on the main thread. If you just add a

var scheduler = new EventLoopScheduler();
scheduler.Schedule(() => Thread.Sleep(1000));

to the very beginning, you make the event loop busy, causing everything to go into queuing, so you then see everything logs in the event loop thread. So this has nothing to do with P/Invoke.

To be clear, this isn't about the schedulers being specified for observing, but subscribing. When you convert Observables into other abstractions like Tasks, Enumerables, Blocking Joins etc., some internal complexity may leak through.

Asti
  • 12,447
  • 29
  • 38
  • This "just queue immediately on the current thread" behavior is a nice optimization, but can be quite problematic. I remember reading a post from one of the Stephens (I think) about how they had to work around this in some framework code to make sure the stack doesn't get blown (since if it is executed immediately the stack keeps growing, contrary to if it is posted to the thread pool). – Voo May 12 '20 at 20:43
  • I think I remember. The queue-if-required behavior is the cause of a [lot](https://stackoverflow.com/questions/61041484/why-does-repeated-enumerable-to-observable-conversion-block) of [subtle bugs](https://stackoverflow.com/questions/61012408/the-observable-repeat-is-unstoppable-is-it-a-bug-or-a-feature). But it's necessary for being performant. – Asti May 13 '20 at 03:00
  • Thanks a lot for the nice explanation @Asti, so how would you go about making sure that only the code inside the `Run` method runs on the specific thread and not the continuation after the `await`? – Gimly May 13 '20 at 05:45
  • If you really want it to run on the main thread, you'll have to use `Task.Wait` instead of `await` (not recommended). Just pass in an another scheduler to the `ToTask` method. – Asti May 13 '20 at 06:01
  • @Gimly (Still don't know anything about RX.Net so there might be better solutions), but the easiest workaround I can see would be to wrap the Task returned by `ToTask` in a task that is not completed when asked, which should mean the continuation is posted to the synchronization context instead of immediately continued. I very dimly remember there being a nicer solution for this in the aforementioned blog post, but my Googlefu is weak. – Voo May 13 '20 at 11:45
  • 1
    There's an `ObserveOn` overload which takes in a synchronization context. – Asti May 13 '20 at 17:00