1

Unless you have extensive experience with SynchronizationContext, TaskScheduler, TaskFactory, async/await, and multiprocessing, there is no need to read further.

I've spent the last couple of years taking a large mostly-monolithic C# project that started with .NET 2.0 and has moved up through .NET 4.72 that we've been working on for the past 15 years and moving it to .NET 5/6 in order to get off 4.72 which has now been effectively deprecated, as well as to eventually host it on Linux, while keeping it running in production 24/7 with over a million users.
In the process, I've come to appreciate the elegance of the concept of async/await, but have also been very frustrated by the places where it does not behave sensibly.
I've come to terms with most of the quirks, but through this process have come to a point where .NET Core (.NET 6 at this point) appears to be a framework primarily for user-interactive software and not so much for other purposes such as queue processing. It seems that the async/await in .NET Core was designed primarily to handle applications built around the old windows message pump UI and ASP.NET one-stream-of-execution-per-request, to the exclusion of some other architectures.

Let me elaborate. Previous to these updates, much of our code did backend processing using a custom thread pool.
Given the nature of the project (half front-end web API services and half headless backend queue processing) and the need to optimize costs, we strive to hover near 97% CPU utilization on the backend processing servers, and we're leveraging parallel processing for some front-end operations as well. We've found that for any system, if you get much higher than 97% CPU utilization, you lose the ability to monitor what's going on on the server, as well as to detect infinite loops when they inevitably happen. We had algorithms that achieved approximately this level of utilization pretty well. There are several reasons we made a custom thread pool:

  1. When many operations get queued (which is generally needed to keep the CPU in the optimal range) the default ThreadPool, because it is not FIFO, inevitably starves some long-queued operations in favor of more recently queued operations, causing timeouts and other difficult problems. Compensating for this is wasteful and tedious when a FIFO processing engine can avoid the timeouts by processing things in the far more sensible FIFO order.
  2. It's extremely difficult (if not impossible) to determine when and how much work to queue to the default thread pool.
    The overall system CPU utilization is low whether it is underloaded or overloaded, and there doesn't seem to be any combination of properties to indicate which of those states it is in (a combination of PendingWorkItemCount and ThreadCount almost does it, but in practice, I haven't been able to come up with an algorithm that's reliable with arbitrary workloads).

As a result, achieving sustained 95%+ CPU utilization using the default thread pool with anything but toy code has eluded multiple prolonged efforts over many years. With a custom thread pool, we can easily see whether we have more capacity or not, combine that with the current CPU utilization, and use that to decide whether or not to start more work.

Enter async.
The concept is wonderful, the promise of being able to keep the logical code flow mostly the same as it has been and stop wasting threads waiting for IO and thereby greatly reduce memory usage and gain some performance improvements as well has driven everyone this direction, so much so that many of the libraries we consume no longer have non-async versions to consume. I have found no expert advice anywhere that recommends calling async code from non-async code under any circumstances. So, the process of asyncification began, with the async zombie virus propagating up the code from the bottom up, with some temporary wrappers to fool the asyncified code to run synchronously until we got all the way to the top. Inevitably we reached the code that used the custom thread pool, and so we attempted to convert it to async/await as well. However, using our own custom ThreadPool does not play well with async/await (there are numerous SO and blog posts about this). As a result, I decided to write my own SynchronizationContext/TaskScheduler that used our custom thread pool. There is very little documentation on how to do this properly (and none at all for many things), and I've spent months researching and implementing, reading blogs and SO posts by Stephen Toub and Stephen Cleary, as well as combing through the reference source. The implementation I've ended up with is much too large to include here. It works mostly, but processing still seemed to get posted to the default thread pool, causing all sorts of the overloading and ambiguity in underload/overload state described above. I combed through the project looking for patterns like Task t = MyAsyncCode(); and new Task(() => func()) that run code on the default TaskScheduler with no way to override that behavior. After eliminating all of those, problems with code running on the default TaskScheduler remained. I eventually tracked these issues down to the fact that using "await" on any significant code always switches processing back to the default task scheduler. This is unacceptable due to the issues mentioned above.
Eventually, I came across this SO post: Understanding the behavior of TaskScheduler.Current. After reading that and looking at the reference source for Task/TaskScheduler, etc. I see why the code behaves the way it does, but can't understand how this behavior makes any sense in any context outside the twisted world of ASP.NET and UI code. Does it really make sense for async work to completely ignore the 'current synchronization context' to which work could be queued and queue the work somewhere else instead?

This brings me to my question: Is there any way to create a custom TaskScheduler that actually gets used for all the processing of an async function? If not, it seems like a gaping hole in the system.

Here is an example of what I'd like to get to run:

public async Task SOSample()
{
    using MyTaskScheduler scheduler = MyTaskScheduler.Start();
    MySynchronizationContext context = new(scheduler);
    System.Threading.SynchronizationContext? oldContext = SynchronizationContext.Current;
    try
    {
        SynchronizationContext.SetSynchronizationContext(context);
        TaskCompletionSource<bool> completion = new();
        Task unwrapped = await Task.Factory.StartNew(
            () => VerifyTaskSchedulerRemainsCustom(), CancellationToken.None,
            TaskCreationOptions.None, scheduler);
        await unwrapped;
    }
    finally
    {
        SynchronizationContext.SetSynchronizationContext(oldContext);
    }
}

private async Task VerifyTaskSchedulerRemainsCustom()
{
    Assert.IsFalse(ReferenceEquals(TaskScheduler.Current, TaskScheduler.Default));
    await Task.Yield();
    Assert.IsFalse(ReferenceEquals(TaskScheduler.Current, TaskScheduler.Default));
    await Task.Delay(100).ConfigureAwait(true);
    Assert.IsFalse(ReferenceEquals(TaskScheduler.Current, TaskScheduler.Default));

    // ... more arbitrary async processing

    Assert.IsFalse(ReferenceEquals(TaskScheduler.Current, TaskScheduler.Default));
}

Alternatively, is there a way to determine whether the default thread pool is underloaded or overloaded and to determine what overloaded it without a debugger (crucial for debugging problems in production)? This would be a ton of work to switch out, but is preferable in some ways.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
James
  • 3,551
  • 1
  • 28
  • 38
  • I'm not sure if there is an answer to your first question. There could be, I just don't know of one right now. There is a way to check the threads you have though. Here's Microsoft's official documentation on the matter https://learn.microsoft.com/en-us/dotnet/api/system.threading.threadpool.getavailablethreads?view=net-7.0 – Josh Heaps Feb 28 '23 at 23:42
  • I'm curious, how much infra costs you save with these optimizations? – Vlad DX Feb 28 '23 at 23:56
  • @JoshHeaps yep, well aware of that. That hasn't been useful in determining or controlling system lockups caused by the default thread pool. – James Mar 01 '23 at 00:02
  • @TheodorZoulias actually, Assert.AreEqual(TaskScheduler.Current, scheduler) is far more ambiguous, because it may depend on how Equals is defined for TaskScheduler. Thus the comment on the first one. – James Mar 01 '23 at 00:03
  • 1
    @VladDX, it depends on the infrastructure you're running on. If you're running on bare metal it can be 2-3x. If you're running on T* instances on AWS, there's not really any savings. If you're running on C*/M* instance, it can also be 2-3x. It all depends on how much CPU you can get the default thread pool to actually use with your workload(s). It looks like async/await would reduce memory usage by 4x if I could get it fully working. – James Mar 01 '23 at 00:06
  • @TheodorZoulias, Maybe, but I have yet to have Task.Delay(1) *not* swap out the task scheduler, on a very recent development system with 16-cores/32-threads. The point is the same, if you do something that actually has to wait, the task scheduler changes. – James Mar 01 '23 at 00:09
  • @TheodorZoulias, I can't think of a case where it would matter how you invoke `MyGenericAsyncStuff`. In my case, it's either directly or indirectly from the `Main` of a console project, or from a unit test. – James Mar 01 '23 at 00:12
  • @TheodorZoulias, fine, but why (100)? If there is a case where (1) completes before the await, then there are cases where (100) completes as well. Just less likely. – James Mar 01 '23 at 00:20
  • I've never seen [experimentally](https://dotnetfiddle.net/3kWlV6) a thread suspended for more than 50 milliseconds, so `Task.Delay(100)` seems like a safe bet to me. If you have doubts then increase the delay to a value that makes you confident as well. – Theodor Zoulias Mar 01 '23 at 00:26
  • @TheodorZoulias, this isn't a thread--this is a task, and that's a big part of the problem, I've seen them stop between continuations for way too long--tens of MINUTES in many cases--there is no number that would make me comfortable under arbitrary circumstances--this is sample code simply meant to convey the general problem not bulletproof fails-every-time code. That said, you are correct that I missed one step as I extracted this from my test code for sample purposes. That should be fixed now. You can make SOSample a unit test and it should fail. – James Mar 01 '23 at 03:18
  • While I admire your ambition, I worry that you may find that the developer costs of such an undertaking (and future costs of maintaining) this system in a non-typical manner may vastly dwarf any operational costs incurred by writing code that is able to accomplish your goal. As an aside, would you be able to share some of your implementation? There isn't enough code to run to verify the correctness of your test method. – David L Mar 01 '23 at 04:20
  • James the scenario that makes the `Task.Delay(1)` unpredictable is this: The current thread invokes the `Task.Delay(1)`, gets the incomplete `Task`, and then immediately gets suspended by the OS for a few milliseconds. Meanwhile the `Task` is completed on another thread. Afterwards the current thread is resumed by the OS, observes that the `Task` is completed, and continues synchronously after the `await`, without scheduling the continuation through the captured scheduler. So the particular scenario is about the OS, and how it slices the CPU-time to threads. It's not about TPL schedulers. – Theodor Zoulias Mar 01 '23 at 09:03
  • *"You can make `SOSample` a unit test and it should fail."* -- This confuses me. Do you want us to implement the `MyTaskScheduler` and `MySynchronizationContext` in a way that would make the `Assert`ions to fail? Most people want their assertions to be successful! – Theodor Zoulias Mar 01 '23 at 09:39

2 Answers2

3

I see why the code behaves the way it does, but can't understand how this behavior makes any sense in any context outside the twisted world of ASP.NET and UI code.

TaskScheduler.Current is a holdover from the TPL and predates async. The context captured by await is the current SynchronizationContext with a fallback to the current TaskScheduler. Most of the time the SyncCtx is captured and TaskScheduler.Current doesn't come into play at all.

This brings me to my question: Is there any way to create a custom TaskScheduler that actually gets used for all the processing of an async function?

You can create one that is used by default. Note that Task.Run, ConfigureAwait(false), and friends will continue to use the thread pool because that's exactly what they're supposed to do.

You haven't posted code, so I'm just going to guess that the problem is that your TaskScheduler isn't setting the current SynchronizationContext when executing its work items. await will capture the context and use that to schedule its continuation, but it doesn't restore the context; that's the responsibility of the context itself.

Feel free to check out my AsyncContext; it's a single-threaded context, but it provides both a SynchronizationContext and a TaskScheduler.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 1
    The `Task.Run` and `ConfigureAwait(false)` do not guarantee that *all* the work will run on the `ThreadPool`. I've posted an experimental demonstration is [this](https://stackoverflow.com/questions/75557398/will-cpu-bound-work-stay-on-thread-pool-thread-when-using-nested-async-await/75558334#75558334 "Will CPU-bound work stay on thread pool thread when using nested async-await?") answer. – Theodor Zoulias Mar 01 '23 at 10:57
  • 1
    Good point. Continuations could run on another thread, too. – Stephen Cleary Mar 01 '23 at 13:43
1

Here is a relatively simple custom TaskScheduler, that behaves as a custom ThreadPool with a predefined number of threads. It is based on a BlockingCollection<Task>:

public class CustomThreadPool : TaskScheduler, IDisposable
{
    private readonly BlockingCollection<Task> _queue;
    private readonly Thread[] _threads;

    public CustomThreadPool(int threadsCount = 1)
    {
        _queue = new BlockingCollection<Task>();
        _threads = Enumerable.Range(0, threadsCount).Select(_ => new Thread(() =>
        {
            foreach (var task in _queue.GetConsumingEnumerable())
                TryExecuteTask(task);
        }) { IsBackground = true }).ToArray();
        Array.ForEach(_threads, t => t.Start());
    }

    protected override void QueueTask(Task task)
    {
        try { _queue.Add(task); }
        catch (ObjectDisposedException)
        {
            ThreadPool.QueueUserWorkItem(_ => throw new TaskSchedulerException());
        }
    }

    protected override bool TryExecuteTaskInline(Task task,
        bool taskWasPreviouslyQueued)
    {
        if (Array.IndexOf(_threads, Thread.CurrentThread) < 0) return false;
        return TryExecuteTask(task);
    }

    public override int MaximumConcurrencyLevel => _threads.Length;
    protected override IEnumerable<Task> GetScheduledTasks() => _queue;

    public void Dispose()
    {
        _queue.CompleteAdding();
        Array.ForEach(_threads, t => t.Join());
        _queue.Dispose();
    }
}

Let's use this scheduler and check the assertions:

public static async Task Main()
{
    using (CustomThreadPool scheduler = new())
    {
        await Task.Factory.StartNew(() => VerifyTaskSchedulerRemainsCustom(),
            CancellationToken.None, TaskCreationOptions.DenyChildAttach,
            scheduler).Unwrap();

        Console.WriteLine($"Out: {TaskScheduler.Current}");
        await Task.Delay(500); // Give some time for the pending async void.
    }
    Console.WriteLine($"Finished");
}

private static async Task VerifyTaskSchedulerRemainsCustom()
{
    Console.WriteLine($"1: {TaskScheduler.Current}");
    await Task.Yield();
    Console.WriteLine($"2: {TaskScheduler.Current}");
    await Task.Delay(100).ConfigureAwait(true);
    Console.WriteLine($"3: {TaskScheduler.Current}");
    MyAsyncVoid();

    async void MyAsyncVoid()
    {
        Console.WriteLine($"4: {TaskScheduler.Current}");
        await Task.Yield();
        Console.WriteLine($"5: {TaskScheduler.Current}");
        await Task.Delay(100).ConfigureAwait(true);
        Console.WriteLine($"6: {TaskScheduler.Current}");
    }
}

Output:

1: CustomThreadPool
2: CustomThreadPool
3: CustomThreadPool
4: CustomThreadPool
5: CustomThreadPool
Out: System.Threading.Tasks.ThreadPoolTaskScheduler
6: CustomThreadPool
Finished

Online demo.

The TaskScheduler.Current is preserved in all check-points, even inside the async void method. Notice that there is no SynchronizationContext involved. A TaskScheduler is sufficient for this test. The drawback is that the custom TaskScheduler has no way of knowing when the async void method will complete. In the above demo the await Task.Factory.StartNew completes while the MyAsyncVoid is still in-flight. In case your application makes liberal use of async void methods, the above TaskScheduler-only approach might be unsuitable.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104