0

I am trying to produce a stable IAsyncEnumerator for a collection. I am using C# 7.3 and the frameworks I am targeting are .Net Standard 2.1 and .Net Standard 2.0.

The enumerator is stable only when I debug. I essentially grab a list of objects 4 in length. the Promise becomes invalid near the last item in the list or under a condition that I do not quite have an understanding on. I am trying to broaden my tool kit, however asynchronous programming needs some work. I have looked around the web looking for articles and examples of a stable iterator. But none are to be found. in fact, the only thing I have found is that C# 8 will build it for you. Which I completely disagree with. One should understand the tools that they choose to use.

I have produced the following code based on what I could find.

The Unit Test

[Test]
public void ShouldBeAbleToLoadResultSet()
{
    FluentActions.Invoking(async () =>
    {
        var cts = new CancellationTokenSource();

        var people = Context.People
        .AsAsyncSubSonicQueryable()
        .LoadAsync(cts.Token);

        int cnt = 0;

        await foreach(Person person in people.Result
            .WithCancellation(cts.Token)
            .ConfigureAwait(true))
        {
            person.FullName.Should().Be(String.Format("{0}, {1}{2}",
                person.FamilyName, person.FirstName,
                string.IsNullOrEmpty(person.MiddleInitial?.Trim()) ? "" : $" {person.MiddleInitial}."));

            cnt++;
        }

        cnt.Should().Be(Context.People.Count);

    }).Should().NotThrow();

}

The partial class file that deals with the IAsyncEnumerator

using System;
using System.Collections.Generic;
using System.Diagnostics;
#if NETSTANDARD2_1
using System.Diagnostics.CodeAnalysis;
#endif
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace SubSonic.Infrastructure
{
    public sealed partial class SubSonicCollection<TElement>
        : IAsyncEnumerator<TElement>
        , IAsyncStateMachine
        , IValueTaskSource<bool>
        , IValueTaskSource
        , IDisposable
    {
        private static readonly Action<object> CallbackCompleted = _ => Debug.Assert(false, "should not be invoked!");
        private Action<Object> continuation;
        private AsyncIteratorMethodBuilder asyncMethodBuilder;
        private CancellationTokenSource CancellationTokenSource;
        public CancellationToken CancellationToken => CancellationTokenSource?.Token ?? default;
        private ManualResetValueTaskSourceCore<bool> PromiseOfValueOrEnd;
        private TaskAwaiter StateAwaiter;
        private IAsyncStateMachine sm;

        private ExecutionContext executionContext;
        private object scheduler;
        private object state;

        private short token;
        private bool? result;
        private int index;
        private int? istate;
        private bool disposed;

        public TElement Current { get; private set; }

        private TElement GetElementAt(int idx)
        {
            if (TableData is IEnumerable<TElement> data)
            {
                return data.ElementAt(idx);
            }

            return default(TElement);
        }

        IAsyncEnumerator<TElement> IAsyncEnumerable<TElement>.GetAsyncEnumerator(CancellationToken cancellationToken)
        {
            asyncMethodBuilder = new AsyncIteratorMethodBuilder();
            CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

            disposed = false;

            return this;
        }

        public async ValueTask DisposeAsync()
        {
            await Task.CompletedTask.ConfigureAwait(false);

            Dispose();
        }

        public void MoveNext()
        {
            try
            {
                TaskAwaiter awaiter;

                if (disposed)
                {
                    goto DONE_ITERATING;
                }

                switch(istate.GetValueOrDefault(-1))
                {
                    case 0:
                        awaiter = StateAwaiter;
                        goto DONE_AWAIT;
                    case -4:
                        index++;
                        goto LOOP_CONDITION;
                    default:
                        index = 0;
                        goto LOOP_CONDITION;
                }
            LOOP_CONDITION:
                if (index >= Count)
                {
                    goto DONE_ITERATING;
                }
                awaiter = Task.Delay(index, CancellationToken).GetAwaiter();
                if(!awaiter.IsCompleted)
                {
                    istate = 0;
                    StateAwaiter = awaiter;
                    SetStateMachine(this);
                    asyncMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter, ref sm);
                    return;
                }
            DONE_AWAIT:
                awaiter.GetResult();
                Current = GetElementAt(index);
                istate = -4;
                goto RETURN_TRUE_FROM_MOVENEXTASYNC;
            DONE_ITERATING:
                istate = -2;
                CancellationTokenSource?.Dispose();
                PromiseOfValueOrEnd.SetResult(result: false);
                return;
            RETURN_TRUE_FROM_MOVENEXTASYNC:
                PromiseOfValueOrEnd.SetResult(result: true);
            }
            catch(OperationCanceledException ex)
            {
                AbortExecution(ex);
            }
            catch(InvalidOperationException ex)
            {
                AbortExecution(ex);
            }

        }

        private void AbortExecution(Exception ex)
        {
            istate = -2;
            CancellationTokenSource?.Dispose();
            PromiseOfValueOrEnd.SetException(ex);
        }

        public ValueTask<bool> MoveNextAsync()
        {
            if (istate == -2)
            {
                return default;
            }

            ResetAndReleaseOperation();

            SetStateMachine(this);

            asyncMethodBuilder.MoveNext(ref sm);

            if (GetStatus(this.token) == ValueTaskSourceStatus.Succeeded)
            {
                GetResult(this.token);

                Debug.Assert(result.HasValue, "failed to complete");

                return new ValueTask<bool>(result.GetValueOrDefault());
            }
            else
            {
                return new ValueTask<bool>(this, token);
            }
        }

        public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
        {
            if (this.token != token)
            {
                throw Error.InvalidOperation(SubSonicErrorMessages.MultipleContinuations);
            }

            if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
            {
                this.executionContext = ExecutionContext.Capture();
            }

            if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
            {
                SynchronizationContext sc = SynchronizationContext.Current;

                if (sc != null && sc.GetType() == typeof(SynchronizationContext))
                {
                    this.scheduler = sc;
                }
                else
                {
                    TaskScheduler ts = TaskScheduler.Current;
                    if (ts != TaskScheduler.Default)
                    {
                        this.scheduler = ts;
                    }
                }
            }

            this.state = state;

            var previousContinuation = Interlocked.CompareExchange(ref this.continuation, continuation, null);

            if (previousContinuation != null)
            {
                if (!ReferenceEquals(previousContinuation, CallbackCompleted))
                {
                    throw Error.InvalidOperation(SubSonicErrorMessages.ErrorPreviousContinuation);
                }

                this.executionContext = null;
                this.state = null;
            }

            InvokeContinuation(continuation, state);
        }

        private void InvokeContinuation(Action<object> continuation, object state)
        {
            if (continuation is null)
            {
                return;
            }

            object scheduler = this.scheduler ?? TaskScheduler.Default;

            if (scheduler != null)
            {
                if (scheduler is SynchronizationContext sc)
                {
                    sc.Post(s =>
                    {
                        var t = (Tuple<Action<object>, object>)s;
                        t.Item1(t.Item2);
                    }, Tuple.Create(continuation, state));
                }
                else if (scheduler is TaskScheduler ts)
                {
                    Task.Factory.StartNew(continuation, state, CancellationToken, TaskCreationOptions.DenyChildAttach, ts);
                }
                else
                {
                    throw Error.NotSupported($"{scheduler}");
                }
            }
#if NETSTANDARD2_1
            else if (PromiseOfValueOrEnd.RunContinuationsAsynchronously)
            {
                ThreadPool.QueueUserWorkItem(continuation, state, preferLocal: true);
            }
#endif
            else
            {
                continuation(state);
            }

        }

        public void SetStateMachine(IAsyncStateMachine stateMachine)
        {
            sm = stateMachine;
        }

        public ValueTaskSourceStatus GetStatus(short token)
        {
            if (this.token != token)
            {
                throw Error.InvalidOperation(SubSonicErrorMessages.MultipleContinuations);
            }

            return PromiseOfValueOrEnd.GetStatus(token);
        }

        public void GetResult(short token)
        {
            if (this.token != token)
            {
                throw Error.InvalidOperation(SubSonicErrorMessages.MultipleContinuations);
            }

            this.result = PromiseOfValueOrEnd.GetResult(token);
        }

        bool IValueTaskSource<bool>.GetResult(short token)
        {
            if (this.token != token)
            {
                throw Error.InvalidOperation(SubSonicErrorMessages.MultipleContinuations);
            }

            while (GetStatus(token) == ValueTaskSourceStatus.Pending)
            {
                asyncMethodBuilder.MoveNext(ref sm);
            }

            GetResult(token);

            return result.Value;
        }

        private void ResetAndReleaseOperation()
        {
            CancellationToken.ThrowIfCancellationRequested();

            PromiseOfValueOrEnd.Reset();

            this.token = PromiseOfValueOrEnd.Version;
            this.continuation = null;
            this.scheduler = null;
            this.state = null;
            this.result = null;
        }

        private void Dispose(bool disposing)
        {
            if (!disposed)
            {
                if (disposing)
                {
                    CancellationTokenSource?.Dispose();
                    CancellationTokenSource = null;

                    istate = null;
                }

                // TODO: free unmanaged resources (unmanaged objects) and override finalizer
                // TODO: set large fields to null
                disposed = true;
            }
        }

        // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
        // ~SubSonicCollection()
        // {
        //     // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
        //     Dispose(disposing: false);
        // }

        public void Dispose()
        {
            // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
            Dispose(disposing: true);
            GC.SuppressFinalize(this);
        }
    }
}
  • Does `IAsyncEnumerator ` work in 2.0 i am surprised it does. Have you tried to minimise this example. There seems to be a lot of code here to test this feature and or debug. – TheGeneral Jul 08 '20 at 04:00
  • `await foreach(Person person in people)` – Jeremy Lakeman Jul 08 '20 at 04:18
  • `await foreach` is part of C#'s async streams, which were only introduced in C# 8, which is only supported in .NET Standard 2.1 (natively at least [some exceptions apply](https://stackoverflow.com/a/57020770/9363973)). So you'll either need to drop support for .NET Standard 2.0 and therefore the entire .NET Framework, or use some workaround – MindSwipe Jul 08 '20 at 04:36
  • 1
    Ouaou! I admire your dedication, but writing 300+ lines of extremely obscure and complicated code in order to avoid using a language feature (an [async iterator](https://learn.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8)) that does the same thing with 5 lines of code, doesn't sound like the correct way of building software! – Theodor Zoulias Jul 08 '20 at 04:37
  • Did you try to use [Microsoft.Bcl.AsyncInterfaces](https://www.nuget.org/packages/Microsoft.Bcl.AsyncInterfaces/) package, which can be use with .NET Standard 2.0 and .NET FW 4.6.1 and later? – Pavel Anikhouski Jul 08 '20 at 07:00
  • an async iterator small problem, this is C# 8.0 I am working with C# 7.3. there is a difference and the compiler does not build it for you. – Kenneth Carter Jul 08 '20 at 17:14
  • 1
    any how I scrapped the attempt and found an actively supported async enumerable open source project that addresses the asynchronous enumeration and meets my requirements of not crossing into the system namespace. https://github.com/Dasync/AsyncEnumerable – Kenneth Carter Jul 08 '20 at 21:56

0 Answers0