You need to provide a way for the await machinery to post a message back to that same thread.
ThreadPool threads only listen for new messages which are posted to the ThreadPool, so you'll have to create your own non-ThreadPool thread, and make a message queue that it listens for messages on. Write your own SynchronizationContext subclass which knows how to post messages to that queue, and install it on that thread.
With that in place, await will use that SynchronizationContext to post messages to the thread's queue, which the thread can process.
What follows is a fairly basic implementation of this: it's not the most efficient (it creates some excess allocations), but it should be fairly easy to follow.
The SimpleDispatcher
is a class which owns a message queue, and a thread which processes messages from that message queue, one by one. The thread has a custom SynchronizationContext
installed on it, which posts messages back to that message queue.
Make sure you dispose the SimpleDispatcher
! Otherwise its thread will keep running forever.
The dispatcher's PostAsync
method takes a Func<Task>
, which it executes on its thread, and it returns a Task
which completes when the Task
returned by the Func<Task>
completes (and which contains any exception). You can use this to post messages to the dispatcher, see the sample in Main
at the top.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
#nullable enable
public class Program
{
public static async Task Main()
{
using var dispatcher = SimpleDispatcher.StartNew();
var task1 = dispatcher.SendAsync(async () =>
{
Console.WriteLine($"Task 1: Thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(100);
Console.WriteLine($"Task 1: Thread {Thread.CurrentThread.ManagedThreadId}");
});
var task2 = dispatcher.SendAsync(async () =>
{
Console.WriteLine($"Task 2: Thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(100);
Console.WriteLine($"Task 2: Thread {Thread.CurrentThread.ManagedThreadId}");
});
await Task.WhenAll(task1, task2);
Console.WriteLine("All tasks complete");
}
}
public class SimpleDispatcher : IDisposable
{
private readonly BlockingCollection<QueueItem> queue = new();
private readonly CancellationTokenSource disposeCts = new();
private readonly Thread thread;
public SimpleDispatcher()
{
thread = new Thread(Run);
}
public static SimpleDispatcher StartNew()
{
var dispatcher = new SimpleDispatcher();
dispatcher.Start();
return dispatcher;
}
public void Start()
{
if (thread.IsAlive)
throw new InvalidOperationException("Already running");
thread.Start();
}
public void Post(SendOrPostCallback callback, object? state)
{
var item = new QueueItem
{
Delegate = () =>
{
callback(state);
return Task.CompletedTask;
},
Tcs = new(),
};
queue.Add(item);
// Re-throw any resulting exceptions on the ThreadPool
async void RethrowExceptions()
{
await item.Tcs.Task;
}
RethrowExceptions();
}
public void Send(SendOrPostCallback callback, object? state)
{
var item = new QueueItem
{
Delegate = () =>
{
callback(state);
return Task.CompletedTask;
},
Tcs = new(),
};
queue.Add(item);
item.Tcs.Task.Unwrap().GetAwaiter().GetResult();
}
public Task SendAsync(Func<Task> func)
{
var item = new QueueItem
{
Delegate = func,
Tcs = new(),
};
queue.Add(item);
return item.Tcs.Task.Unwrap();
}
private void Run()
{
SynchronizationContext.SetSynchronizationContext(new SimpleDispatcherSynchronizationContext(this));
try
{
while (true)
{
var item = queue.Take(this.disposeCts.Token);
try
{
var task = item.Delegate();
item.Tcs.SetResult(task);
}
catch (Exception e)
{
item.Tcs.SetException(e);
}
}
}
catch (OperationCanceledException) { }
}
public void Dispose()
{
disposeCts.Cancel();
}
private struct QueueItem
{
public Func<Task> Delegate;
// When complete, the message has been processed, and the TCS contains the Task returned by Delegate
public TaskCompletionSource<Task> Tcs;
}
}
public class SimpleDispatcherSynchronizationContext : SynchronizationContext
{
private readonly SimpleDispatcher dispatcher;
public SimpleDispatcherSynchronizationContext(SimpleDispatcher dispatcher) => this.dispatcher = dispatcher;
public override void Send(SendOrPostCallback d, object? state)
{
dispatcher.Send(d, state);
}
public override void Post(SendOrPostCallback d, object? state)
{
dispatcher.Post(d, state);
}
public override SynchronizationContext CreateCopy() => new SimpleDispatcherSynchronizationContext(dispatcher);
}
See it in action on DotNetFiddle.
All of that said, it's utterly pointless if you create a SimpleDispatcher
per partition, and then just run a single thing on it at a time (e.g. a single loop with an await
on it). You create a new dedicated thread, and then the thread's just blocked on the queue during each await
. You might as well just grap a thread off the ThreadPool (with Task.Run
), and use a blocking wait instead of an await
: same effect, less complexity.