0

To learn Await/Async (and to have a tool I can use at work for some weird projects) I have made an object that marshals calls to its public methods into a specific thread to be run there. Essentially each public async method, when called, checks its current thread against the target. If the thread matches, it runs its code as normal. If it doesn't match, it closures its inputs into a lambda that is then stored in a queue being maintained by the target thread. Here is my prototype, with a single public method DoThing.

public class ThreadedObject
    {
        private Thread thread;
        private ConcurrentQueue<Task> concurrentQ = new();
        public ThreadedObject()
        {
            thread = new(() => RunThread());
            thread.IsBackground = true;
            thread.Name = "ThreadedObjectRunner";
            thread.Start();
        }
        private bool IsInWorkerThread()
        {
            return Thread.CurrentThread == thread;
        }
        private void RunThread()
        {
            while (true)
            {
                if (Thread.CurrentThread == thread)
                {
                    Task task;
                    if (concurrentQ.TryDequeue(out task))
                    {
                        Debug.WriteLine("    Invoking a task");
                        task.RunSynchronously(new SynchronousScheduler());
                    }
                }
                else
                {
                    Debug.Fail("Threaded object running outside its thread");
                }
            }
        }

        public async Task<int> DoThing(int length)
        {
            if (this.IsInWorkerThread())
            {
                Debug.WriteLine("    Running task of length " + length + " on thread " + Thread.CurrentThread.Name);
                Thread.Sleep(length);
                Debug.WriteLine("    Finished task of length " + length);
                return length;
            }
            else
            {
                Debug.WriteLine("Queueing task of length " + length);
                return await Enqueue(()=>DoThing(length).Result);
                //return await Enqueue(async () => await DoThing(length)); //CS0029 cannot implicitly convert type 'void' to 'int'
            }
        }
       
        private Task Enqueue(Action action)
        {
            Task later = new Task(() => action.Invoke());
            concurrentQ.Enqueue(later);
            return later;
        }
        private Task<dynamic> Enqueue(Func<dynamic> action)
        {
            Task<dynamic> later = new Task<dynamic>(() => action.Invoke());
            concurrentQ.Enqueue(later);
            return later;
        }
        private class SynchronousScheduler : TaskScheduler
        {
            protected override IEnumerable<Task> GetScheduledTasks()
            {
                return Enumerable.Empty<Task>();
            }

            protected override void QueueTask(Task task)
            {
                base.TryExecuteTask(task);
            }

            protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
            {
                base.TryExecuteTask(task);
                return true;
            }
        }
    }

So far this works as intended, surprisingly. However, I've been told not to use Task.Result when I can avoid it since it can deadlock in certain cases, and I am using it in the return from DoThing when queueing the lambda. Can/should/how would I replace that Task.Result with an await? You can see what I attempted commented out just below, but my grasp of what I'm doing is a bit too tenuous to get it to work.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Taeo
  • 55
  • 7
  • I suspect you want to change `Task.Return` to `Task.Result` in your question title – Flydog57 Feb 04 '22 at 00:29
  • As a side note, the `RunThread` method incorporates a tight loop, which hijacks one core of the machine, and converts it from a calculation device to a tiny heat generator. You may want to take a look at the [`BlockingCollection`](https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1) class to solve this issue. – Theodor Zoulias Feb 07 '22 at 15:32
  • thats a good point, I'll have a look at Blocking Collection, thanks – Taeo Feb 07 '22 at 15:50

1 Answers1

1

When you do this:

return await Enqueue(async () => await DoThing(length));

It uses the Task version of Enqueue instead of the Task<dynamic> version. In reality, neither overload of Enqueue works for that line. If you comment out Task Enqueue(Action action), the compiler error changes to:

CS4010: Cannot convert async lambda expression to delegate type 'Func'. An async lambda expression may return void, Task or Task, none of which are convertible to 'Func'.

You need to change the parameter type in Enqueue (or create a third overload, if you need to) to Func<Task<dynamic>> to make it work.

Your use of new Task() and task.RunSynchronously() is suspicious, but if it works, it works. I'll have to let someone else comment on that. I don't know enough about it. Reading through the answers on this question might help you decide if you really want to be using task.RunSynchronously().

Gabriel Luci
  • 38,328
  • 4
  • 55
  • 84
  • If I change the signature for Enqueue to ```private Task Enqueue(Func> action)``` it does allow me to pass the async lambda to it, but then awaiting "DoThing" results in a Task instead of the int result, so I'm not quite there – Taeo Feb 07 '22 at 15:50