20

I use C# iterators as a replacement for coroutines, and it has been working great. I want to switch to async/await as I think the syntax is cleaner and it gives me type safety. In this (outdated) blog post, Jon Skeet shows a possible way to implement it.

I chose to go a slightly different way (by implementing my own SynchronizationContext and using Task.Yield). This worked fine.

Then I realized there would be a problem; currently a coroutine doesn't have to finish running. It can be stopped gracefully at any point where it yields. We might have code like this:

private IEnumerator Sleep(int milliseconds)
{
    Stopwatch timer = Stopwatch.StartNew();
    do
    {
        yield return null;
    }
    while (timer.ElapsedMilliseconds < milliseconds);
}

private IEnumerator CoroutineMain()
{
    try
    {
        // Do something that runs over several frames
        yield return Coroutine.Sleep(5000);
    }
    finally
    {
        Log("Coroutine finished, either after 5 seconds, or because it was stopped");
    }
}

The coroutine works by keeping track of all enumerators in a stack. The C# compiler generates a Dispose function which can be called to ensure that the 'finally' block is correctly invoked in CoroutineMain, even if the enumeration isn't finished. This way we can stop a coroutine gracefully, and still ensure finally blocks are invoked, by calling Dispose on all the IEnumerator objects on the stack. This is basically manually unwinding.

When I wrote my implementation with async/await I realized that we would lose this feature, unless I'm mistaken. I then looked up other coroutine solutions, and it doesn't look like Jon Skeet's version handles it in any way either.

The only way I can think of to handle this would be to have our own custom 'Yield' function, which would check if the coroutine was stopped, and then raise an exception that indicated this. This would propagate up, executing finally blocks, and then be caught somewhere near the root. I don't find this pretty though, as 3rd party code could potentially catch the exception.

Am I misunderstanding something, and is this possible to do in an easier way? Or do I need to go the exception way to do this?

EDIT: More information/code has been requested, so here's some. I can guarantee this is going to be running on only a single thread, so there's no threading involved here. Our current coroutine implementation looks a bit like this (this is simplified, but it works in this simple case):

public sealed class Coroutine : IDisposable
{
    private class RoutineState
    {
        public RoutineState(IEnumerator enumerator)
        {
            Enumerator = enumerator;
        }

        public IEnumerator Enumerator { get; private set; }
    }

    private readonly Stack<RoutineState> _enumStack = new Stack<RoutineState>();

    public Coroutine(IEnumerator enumerator)
    {
        _enumStack.Push(new RoutineState(enumerator));
    }

    public bool IsDisposed { get; private set; }

    public void Dispose()
    {
        if (IsDisposed)
            return;

        while (_enumStack.Count > 0)
        {
            DisposeEnumerator(_enumStack.Pop().Enumerator);
        }

        IsDisposed = true;
    }

    public bool Resume()
    {
        while (true)
        {
            RoutineState top = _enumStack.Peek();
            bool movedNext;

            try
            {
                movedNext = top.Enumerator.MoveNext();
            }
            catch (Exception ex)
            {
                // Handle exception thrown by coroutine
                throw;
            }

            if (!movedNext)
            {
                // We finished this (sub-)routine, so remove it from the stack
                _enumStack.Pop();

                // Clean up..
                DisposeEnumerator(top.Enumerator);


                if (_enumStack.Count <= 0)
                {
                    // This was the outer routine, so coroutine is finished.
                    return false;
                }

                // Go back and execute the parent.
                continue;
            }

            // We executed a step in this coroutine. Check if a subroutine is supposed to run..
            object value = top.Enumerator.Current;
            IEnumerator newEnum = value as IEnumerator;
            if (newEnum != null)
            {
                // Our current enumerator yielded a new enumerator, which is a subroutine.
                // Push our new subroutine and run the first iteration immediately
                RoutineState newState = new RoutineState(newEnum);
                _enumStack.Push(newState);

                continue;
            }

            // An actual result was yielded, so we've completed an iteration/step.
            return true;
        }
    }

    private static void DisposeEnumerator(IEnumerator enumerator)
    {
        IDisposable disposable = enumerator as IDisposable;
        if (disposable != null)
            disposable.Dispose();
    }
}

Assume we have code like the following:

private IEnumerator MoveToPlayer()
{
  try
  {
    while (!AtPlayer())
    {
      yield return Sleep(500); // Move towards player twice every second
      CalculatePosition();
    }
  }
  finally
  {
    Log("MoveTo Finally");
  }
}

private IEnumerator OrbLogic()
{
  try
  {
    yield return MoveToPlayer();
    yield return MakeExplosion();
  }
  finally
  {
    Log("OrbLogic Finally");
  }
}

This would be created by passing an instance of the OrbLogic enumerator to a Coroutine, and then running it. This allows us to tick the coroutine every frame. If the player kills the orb, the coroutine doesn't finish running; Dispose is simply called on the coroutine. If MoveTo was logically in the 'try' block, then calling Dispose on the top IEnumerator will, semantically, make the finally block in MoveTo execute. Then afterwards the finally block in OrbLogic will execute. Note that this is a simple case and the cases are much more complex.

I am struggling to implement similar behavior in the async/await version. The code for this version looks like this (error checking omitted):

public class Coroutine
{
    private readonly CoroutineSynchronizationContext _syncContext = new CoroutineSynchronizationContext();

    public Coroutine(Action action)
    {
        if (action == null)
            throw new ArgumentNullException("action");

        _syncContext.Next = new CoroutineSynchronizationContext.Continuation(state => action(), null);
    }

    public bool IsFinished { get { return !_syncContext.Next.HasValue; } }

    public void Tick()
    {
        if (IsFinished)
            throw new InvalidOperationException("Cannot resume Coroutine that has finished");

        SynchronizationContext curContext = SynchronizationContext.Current;
        try
        {
            SynchronizationContext.SetSynchronizationContext(_syncContext);

            // Next is guaranteed to have value because of the IsFinished check
            Debug.Assert(_syncContext.Next.HasValue);

            // Invoke next continuation
            var next = _syncContext.Next.Value;
            _syncContext.Next = null;

            next.Invoke();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(curContext);
        }
    }
}

public class CoroutineSynchronizationContext : SynchronizationContext
{
    internal struct Continuation
    {
        public Continuation(SendOrPostCallback callback, object state)
        {
            Callback = callback;
            State = state;
        }

        public SendOrPostCallback Callback;
        public object State;

        public void Invoke()
        {
            Callback(State);
        }
    }

    internal Continuation? Next { get; set; }

    public override void Post(SendOrPostCallback callback, object state)
    {
        if (callback == null)
            throw new ArgumentNullException("callback");

        if (Current != this)
            throw new InvalidOperationException("Cannot Post to CoroutineSynchronizationContext from different thread!");

        Next = new Continuation(callback, state);
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        throw new NotSupportedException();
    }

    public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout)
    {
        throw new NotSupportedException();
    }

    public override SynchronizationContext CreateCopy()
    {
        throw new NotSupportedException();
    }
}

I don't see how to implement similar behavior to the iterator version using this. Apologies in advance for the lengthy code!

EDIT 2: The new method seems to be working. It allows me to do stuff like:

private static async Task Test()
{
    // Second resume
    await Sleep(1000);
    // Unknown how many resumes
}

private static async Task Main()
{
    // First resume
    await Coroutine.Yield();
    // Second resume
    await Test();
}

Which provides a very nice way of building AI for games.

jakobbotsch
  • 6,167
  • 4
  • 26
  • 39
  • A follow-up blog post: [Asynchronous coroutines with C# 8.0 and IAsyncEnumerable](https://dev.to/noseratio/asynchronous-coroutines-with-c-8-0-and-iasyncenumerable-2e04) – noseratio Aug 26 '20 at 23:28

2 Answers2

14

Updated, a follow-up blog post: Asynchronous coroutines with C# 8.0 and IAsyncEnumerable.


I use C# iterators as a replacement for coroutines, and it has been working great. I want to switch to async/await as I think the syntax is cleaner and it gives me type safety...

IMO, it's a very interesting question, although it took me awhile to fully understand it. Perhaps, you didn't provide enough sample code to illustrate the concept. A complete app would help, so I'll try to fill this gap first. The following code illustrates the usage pattern as I understood it, please correct me if I'm wrong:

using System;
using System.Collections;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    // https://stackoverflow.com/q/22852251/1768303

    public class Program
    {
        class Resource : IDisposable
        {
            public void Dispose()
            {
                Console.WriteLine("Resource.Dispose");
            }

            ~Resource()
            {
                Console.WriteLine("~Resource");
            }
        }

        private IEnumerator Sleep(int milliseconds)
        {
            using (var resource = new Resource())
            {
                Stopwatch timer = Stopwatch.StartNew();
                do
                {
                    yield return null;
                }
                while (timer.ElapsedMilliseconds < milliseconds);
            }
        }

        void EnumeratorTest()
        {
            var enumerator = Sleep(100);
            enumerator.MoveNext();
            Thread.Sleep(500);
            //while (e.MoveNext());
            ((IDisposable)enumerator).Dispose();
        }

        public static void Main(string[] args)
        {
            new Program().EnumeratorTest();
            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
            GC.WaitForPendingFinalizers();
            Console.ReadLine();
        }
    }
}

Here, Resource.Dispose gets called because of ((IDisposable)enumerator).Dispose(). If we don't call enumerator.Dispose(), then we'll have to uncomment //while (e.MoveNext()); and let the iterator finish gracefully, for proper unwinding.

Now, I think the best way to implement this with async/await is to use a custom awaiter:

using System;
using System.Collections;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    // https://stackoverflow.com/q/22852251/1768303
    public class Program
    {
        class Resource : IDisposable
        {
            public void Dispose()
            {
                Console.WriteLine("Resource.Dispose");
            }

            ~Resource()
            {
                Console.WriteLine("~Resource");
            }
        }

        async Task SleepAsync(int milliseconds, Awaiter awaiter)
        {
            using (var resource = new Resource())
            {
                Stopwatch timer = Stopwatch.StartNew();
                do
                {
                    await awaiter;
                }
                while (timer.ElapsedMilliseconds < milliseconds);
            }
            Console.WriteLine("Exit SleepAsync");
        }

        void AwaiterTest()
        {
            var awaiter = new Awaiter();
            var task = SleepAsync(100, awaiter);
            awaiter.MoveNext();
            Thread.Sleep(500);

            //while (awaiter.MoveNext()) ;
            awaiter.Dispose();
            task.Dispose();
        }

        public static void Main(string[] args)
        {
            new Program().AwaiterTest();
            GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
            GC.WaitForPendingFinalizers();
            Console.ReadLine();
        }

        // custom awaiter
        public class Awaiter :
            System.Runtime.CompilerServices.INotifyCompletion,
            IDisposable
        {
            Action _continuation;
            readonly CancellationTokenSource _cts = new CancellationTokenSource();

            public Awaiter()
            {
                Console.WriteLine("Awaiter()");
            }

            ~Awaiter()
            {
                Console.WriteLine("~Awaiter()");
            }

            public void Cancel()
            {
                _cts.Cancel();
            }

            // let the client observe cancellation
            public CancellationToken Token { get { return _cts.Token; } }

            // resume after await, called upon external event
            public bool MoveNext()
            {
                if (_continuation == null)
                    return false;

                var continuation = _continuation;
                _continuation = null;
                continuation();
                return _continuation != null;
            }

            // custom Awaiter methods
            public Awaiter GetAwaiter()
            {
                return this;
            }

            public bool IsCompleted
            {
                get { return false; }
            }

            public void GetResult()
            {
                this.Token.ThrowIfCancellationRequested();
            }

            // INotifyCompletion
            public void OnCompleted(Action continuation)
            {
                _continuation = continuation;
            }

            // IDispose
            public void Dispose()
            {
                Console.WriteLine("Awaiter.Dispose()");
                if (_continuation != null)
                {
                    Cancel();
                    MoveNext();
                }
            }
        }
    }
}

When it's time to unwind, I request the cancellation inside Awaiter.Dispose and drive the state machine to the next step (if there's a pending continuation). This leads to observing the cancellation inside Awaiter.GetResult (which is called by the compiler-generated code). That throws TaskCanceledException and further unwinds the using statement. So, the Resource gets properly disposed of. Finally, the task transitions to the cancelled state (task.IsCancelled == true).

IMO, this is a more simple and direct approach than installing a custom synchronization context on the current thread. It can be easily adapted for multithreading (some more details here).

This should indeed give you more freedom than with IEnumerator/yield. You could use try/catch inside your coroutine logic, and you can observe exceptions, cancellation and the result directly via the Task object.

Updated, AFAIK there is no analogy for the iterator's generated IDispose, when it comes to async state machine. You really have to drive the state machine to an end when you want to cancel/unwind it. If you want to account for some negligent use of try/catch preventing the cancellation, I think the best you could do is to check if _continuation is non-null inside Awaiter.Cancel (after MoveNext) and throw a fatal exception out-of-the-band (using a helper async void method).

noseratio
  • 59,932
  • 34
  • 208
  • 486
  • 1
    That is what I was afraid of. In an ideal world that is still a perfect solution, but our code base is unfortunately not an ideal world. I will do some more research before marking this as the answer. Thanks for the responses! – jakobbotsch Apr 05 '14 at 00:54
  • @Janiels, BTW, don't you think they managed to implement `IDispose` for iterators because they decided to not allow `try/catch` inside iterators, only `try/finally`? So, you existing code base should already be compatible, as there is no use of `catch` inside your iterators. – noseratio Apr 05 '14 at 00:59
  • 1
    Another good point, but I think it's the other way around; they don't allow `try/catch` because they have to implement `IDispose` (it might be possible, but probably really complex?). The usage cases for iterators and `async/await` are different when you consider that iterators need to be able to stop gracefully without running to the end - so the compiler generates code to call Dispose after `foreach`, for example. – jakobbotsch Apr 05 '14 at 01:10
  • You're right that our code base should be compatible right now - I hadn't thought about that fully. I just dislike the idea that 3rd party code can mess up our program somewhere down the line - for example through something like `Root (resource) -> 3rd party (catch) -> sleep`. I really do think the syntax of the `async/await` coroutines is much nicer than the iterator version though, so it probably outweighs this problem (which we shouldn't see with properly written code). – jakobbotsch Apr 05 '14 at 01:12
  • If I don't want to involve threading, do I not have to implement my own `SynchronizationContext`? As far as I see, if I await the `Task` returned by `SleepAsync` it will involve a TaskAwaiter which uses a `TaskScheduler`. I also see it queues the continuation directly on the `Task` object though - is that the reason I don't need my own `SynchronizationContext`? – jakobbotsch Apr 05 '14 at 17:27
  • @Janiels, `TaskAwaiter` uses either captured s.context or `TaskScheduler.Current`. It also tries to inline the continuation (execute it on the same thread), if the captured s.context is the same that the current thread's s.context, or if there's no s.context. You don't need to implement a custom s.context, but if you don't want to use a custom awaiter, you'd need to pass a callback interface to drive your coroutine. Something like `Task IMoveNext.GetNextTask()`. You could return `Task.FromResult` for instant synchronous continuation. – noseratio Apr 06 '14 at 00:16
  • 1
    I got it working using no synchronization context with the custom awaiter, as your post initially suggested. It seems the behavior with sync context changes from .NET 4 with `Microsoft.Bcl.Async` to .NET 4.5. In .NET 4.5, the continuation after a task returns is inlined even when there is a custom sync context, while `Microsoft.Bcl.Async` always `Post`'s it to the sync context. I wasn't aware of this inline behavior so I thought there would be threading issues without a custom sync context - I'm glad there's not. Thanks for all the help. – jakobbotsch Apr 06 '14 at 01:54
  • @Janiels, I wasn't aware of such `Microsoft.Bcl.Async` behavior either, it's good to know that. Glad if helped, I've learnt something new here too. – noseratio Apr 06 '14 at 01:56
  • I'm sorry that I keep commenting, but as I continue to work on this library I have more questions. I am pretty much relying on the fact that the continuations I enqueue on the tasks that use the custom awaiter here are inlined. If I understand correctly, inlining means that when a task finishes, it invokes its continuations immediately from the same context that it finishes in. Thus there is no threading involved here with that behavior. Is this inlining behavior documented somewhere, or am I relying on something that might change in the future? – jakobbotsch Apr 07 '14 at 00:15
  • @Janiels, it's a good question, you may want to post this as a separate question to give it more attention. First, AFAIU, if you use a custom awaiter like `await awaiter`, there's no task created for for `awaiter`. You have complete control over continuation with `INotifyCompletion.OnCompleted`, it doesn't make sense to talk about inlining here. If you don't explicitly introduce a thread switch before you call `continuation`, it will all happen on the same stack frame. – noseratio Apr 07 '14 at 00:28
  • Now, it's a different matter if you use something like `task.ContinueWith(() => continuation(), TaskContinuationsOptions.ExecuteSynchronously)` to schedule the `continuation`, rather than calling it directly. In this case, it may or may not be inlined. – noseratio Apr 07 '14 at 00:29
  • I edited in some example code with this new async coroutines. The idea is to have several tasks. The tasks might await the custom awaiter, but they might also await helper tasks. Ultimately the only thing awaited will be the custom awaiter, and other tasks that await the custom awaiter. So far this seems to be working fine (everything is executed synchronously until something awaits the custom awaiter - when the continuation is then called, the tasks themselves also synchronously execute their continuations if they finish). – jakobbotsch Apr 07 '14 at 00:58
  • 1
    @Janiels, for `await task`, it should work as expected (i.e. inlined), unless there's a sync. context on the thread (which I think you don't have). But strictly speaking, it's not guaranteed for some edge cases, check [this](http://blogs.msdn.com/b/pfxteam/archive/2012/02/07/10265067.aspx). Also, be aware of [this](http://stackoverflow.com/q/22672984/1768303) `ConfigureAwait` behavior. – noseratio Apr 07 '14 at 01:26
  • I see, so basically `await task; RestOfFunction();` is the same as `await task.ContinueWith(() => RestOfFunction(), TaskContinuationOptions.ExecuteSynchronously)`? And for those few cases (stack running out, thread aborting, task scheduler denying it) that will not be executed inline. Good to know - I shouldn't hit any of those cases. – jakobbotsch Apr 07 '14 at 02:25
  • @Janiels, it's not exactly 100% the same, check [`TaskAwaiter.OnCompleted` implementation](http://referencesource.microsoft.com/#mscorlib/system/runtime/compilerservices/TaskAwaiter.cs#45f10a20f8fdfd61). Logically, I expect it to be **the same in the *absence* of sync. context**. I.e., `await task` will be inlined. For some edge cases, it might be different though. Anyhow, you probably should not rely on any undocumented behavior. If you need to make sure it's the *same* thread, put an explicit check after `await` and throw (I know it's ugly :)) – noseratio Apr 07 '14 at 02:34
  • I see it depends on `continueOnCapturedContext`, which is what you can change with `ConfigureAwait`, I suppose. I will be nulling the sync context so there is no chance that the continuation is queued on another thread. Checking for the same thread after every await is not something I can rely on 3rd party devs doing, and as you said, it's not very pretty. Perhaps I should use my own sync context which calls continuations on the single thread, along with the custom awaiter. Then I should be ensured we're on a single thread (aside from the `ConfigureAwait(false)` case you linked). – jakobbotsch Apr 07 '14 at 11:12
7

Updated, this has evolved to a blog post: Asynchronous coroutines with C# 8.0 and IAsyncEnumerable.


It's 2020 and my other answer about await and coroutines is quite outdated by today's C# language standards. C# 8.0 has introduced support for asynchronous streams with new features like:

To familiarize yourself with the concept of asynchronous streams, I could highly recommend reading "Iterating with Async Enumerables in C# 8", by Stephen Toub.

Together, these new features provide a great base for implementing asynchronous co-routines in C# in a much more natural way.

Wikipedia provides a good explanation of what co-routines (aka corotines) generally are. What I'd like to show here is how co-routines can be async, suspending their execution flow by using await and arbitrary swapping the roles of being producer/consumer to each other, with C# 8.0.

The code fragment below should illustrate the concept. Here we have two co-routines, CoroutineA and CoroutineB which execute cooperatively and asynchronously, by yielding to each other as their pseudo-linear execution flow goes on.

namespace Tests
{
    [TestClass]
    public class CoroutineProxyTest
    {
        const string TRACE_CATEGORY = "coroutines";

        /// <summary>
        /// CoroutineA yields to CoroutineB
        /// </summary>
        private async IAsyncEnumerable<string> CoroutineA(
            ICoroutineProxy<string> coroutineProxy,
            [EnumeratorCancellation] CancellationToken token)
        {
            await using (var coroutine = await coroutineProxy.AsAsyncEnumerator(token))
            {
                const string name = "A";
                var i = 0;

                // yielding 1
                Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
                yield return $"{i} from {name}";

                // receiving
                if (!await coroutine.MoveNextAsync())
                {
                    yield break;
                }
                Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);

                // yielding 2
                Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
                yield return $"{i} from {name}";

                // receiving
                if (!await coroutine.MoveNextAsync())
                {
                    yield break;
                }
                Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);

                // yielding 3
                Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
                yield return $"{i} from {name}";
            }
        }

        /// <summary>
        /// CoroutineB yields to CoroutineA
        /// </summary>
        private async IAsyncEnumerable<string> CoroutineB(
            ICoroutineProxy<string> coroutineProxy,
            [EnumeratorCancellation] CancellationToken token)
        {
            await using (var coroutine = await coroutineProxy.AsAsyncEnumerator(token))
            {
                const string name = "B";
                var i = 0;

                // receiving
                if (!await coroutine.MoveNextAsync())
                {
                    yield break;
                }
                Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);

                // yielding 1
                Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
                yield return $"{i} from {name}";

                // receiving
                if (!await coroutine.MoveNextAsync())
                {
                    yield break;
                }
                Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);

                // yielding 2
                Trace.WriteLine($"{name} about to yeild: {++i}", TRACE_CATEGORY);
                yield return $"{i} from {name}";

                // receiving
                if (!await coroutine.MoveNextAsync())
                {
                    yield break;
                }
                Trace.WriteLine($"{name} received: {coroutine.Current}", TRACE_CATEGORY);
            }
        }

        /// <summary>
        /// Testing CoroutineA and CoroutineB cooperative execution
        /// </summary>
        [TestMethod] 
        public async Task Test_Coroutine_Execution_Flow()
        {
            // Here we execute two cotoutines, CoroutineA and CoroutineB,
            // which asynchronously yield to each other

            //TODO: test cancellation scenarios
            var token = CancellationToken.None;

            using (var apartment = new Tests.ThreadPoolApartment())
            {
                await apartment.Run(async () =>
                {
                    var proxyA = new CoroutineProxy<string>();
                    var proxyB = new CoroutineProxy<string>();

                    var listener = new Tests.CategoryTraceListener(TRACE_CATEGORY);
                    Trace.Listeners.Add(listener);
                    try
                    {
                        // start both coroutines
                        await Task.WhenAll(
                            proxyA.Run(token => CoroutineA(proxyB, token), token),
                            proxyB.Run(token => CoroutineB(proxyA, token), token))
                            .WithAggregatedExceptions();
                    }
                    finally
                    {
                        Trace.Listeners.Remove(listener);
                    }

                    var traces = listener.ToArray();
                    Assert.AreEqual(traces[0], "A about to yeild: 1");
                    Assert.AreEqual(traces[1], "B received: 1 from A");
                    Assert.AreEqual(traces[2], "B about to yeild: 1");
                    Assert.AreEqual(traces[3], "A received: 1 from B");
                    Assert.AreEqual(traces[4], "A about to yeild: 2");
                    Assert.AreEqual(traces[5], "B received: 2 from A");
                    Assert.AreEqual(traces[6], "B about to yeild: 2");
                    Assert.AreEqual(traces[7], "A received: 2 from B");
                    Assert.AreEqual(traces[8], "A about to yeild: 3");
                    Assert.AreEqual(traces[9], "B received: 3 from A");
                });
            }
        }
    }
}

The test's output looks like this:

coroutines: A about to yeild: 1
coroutines: B received: 1 from A
coroutines: B about to yeild: 1
coroutines: A received: 1 from B
coroutines: A about to yeild: 2
coroutines: B received: 2 from A
coroutines: B about to yeild: 2
coroutines: A received: 2 from B
coroutines: A about to yeild: 3
coroutines: B received: 3 from A

I currently use asynchronous co-routines in some of my automated UI testing scenarios. E.g., I might have an asynchronous test workflow logic that runs on a UI thread (that'd be CouroutineA) and a complimentary workflow that runs on a ThreadPool thread as a part of a [TestMethod] method (that'd be CouroutineB).

Then I could do something like await WaitForUserInputAsync(); yield return true; to synchronize at certain key points of CouroutineA and CouroutineB cooperative execution flow.

Without yield return I'd have to use some form of asynchronous synchronization primitives, like Stephen Toub's AsyncManualResetEvent. I personally feel using co-routines is a more natural way of doing such kind of synchronization.

The code for CoroutineProxy (which drives the execution of co-routines) is still a work-in-progress. It currently uses TPL Dataflow's BufferBlock as a proxy queue to coordinate the asynchronous execution, and I am not sure yet if it is an optimal way of doing that. Currently, this is what it looks like this:

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

#nullable enable

namespace Tests
{
    public interface ICoroutineProxy<T>
    {
        public Task<IAsyncEnumerable<T>> AsAsyncEnumerable(CancellationToken token = default);
    }

    public static class CoroutineProxyExt
    {
        public async static Task<IAsyncEnumerator<T>> AsAsyncEnumerator<T>(
            this ICoroutineProxy<T> @this,
            CancellationToken token = default)
        {
            return (await @this.AsAsyncEnumerable(token)).GetAsyncEnumerator(token);
        }
    }

    public class CoroutineProxy<T> : ICoroutineProxy<T>
    {
        readonly TaskCompletionSource<IAsyncEnumerable<T>> _proxyTcs =
            new TaskCompletionSource<IAsyncEnumerable<T>>(TaskCreationOptions.RunContinuationsAsynchronously);

        public CoroutineProxy()
        {
        }

        private async IAsyncEnumerable<T> CreateProxyAsyncEnumerable(
            ISourceBlock<T> bufferBlock,
            [EnumeratorCancellation] CancellationToken token)
        {
            var completionTask = bufferBlock.Completion;
            while (true)
            {
                var itemTask = bufferBlock.ReceiveAsync(token);
                var any = await Task.WhenAny(itemTask, completionTask);
                if (any == completionTask)
                {
                    // observe completion exceptions if any
                    await completionTask; 
                    yield break;
                }
                yield return await itemTask;
            }
        }

        async Task<IAsyncEnumerable<T>> ICoroutineProxy<T>.AsAsyncEnumerable(CancellationToken token)
        {
            using (token.Register(() => _proxyTcs.TrySetCanceled(), useSynchronizationContext: true))
            {
                return await _proxyTcs.Task;
            }
        }

        public async Task Run(Func<CancellationToken, IAsyncEnumerable<T>> routine, CancellationToken token)
        {
            token.ThrowIfCancellationRequested();

            var bufferBlock = new BufferBlock<T>();
            var proxy = CreateProxyAsyncEnumerable(bufferBlock, token);
            _proxyTcs.SetResult(proxy); // throw if already set

            try
            {
                //TODO: do we need to use routine(token).WithCancellation(token) ?
                await foreach (var item in routine(token))
                {
                    await bufferBlock.SendAsync(item, token);
                }
                bufferBlock.Complete();
            }
            catch (Exception ex)
            {
                ((IDataflowBlock)bufferBlock).Fault(ex);
                throw;
            }
        }
    }
}
noseratio
  • 59,932
  • 34
  • 208
  • 486