1

I have a WPF application, which uses a custom implemented ForEachAsyncParallel method:

public static Task ForEachParallelAsync<T>(this IEnumerable<T> source, Func<T, Task> body, int maxDegreeOfParallelism)
{
      return Task.WhenAll(Partitioner.Create(source).GetPartitions(maxDegreeOfParallelism).Select(partition => Task.Run(async delegate
      {
           using (partition)
           {
               while (partition.MoveNext())
               {
                   await body(partition.Current);
               }
           }
      })));
}

The issue I'm facing is when a the body contains a async statement and on executing it, it switches the thread to a new one, which is causing an The calling thread cannot access this object because a different thread owns it.

Alternate is to use foreach loop or dispatcher ( which I do not want to) is it possible to point out the issue with the current implementaion?

The Piatre
  • 53
  • 6
  • When an `await` is run, it captures the current `SynchronizationContext` (if any). When the `Task` being awaited completes, the completion (the line after the `await`) is posted to that `SynchronizationContext` if one was captured, otherwise it's posted to the ThreadPool. You're running that`await` inside a `Task.Run`, which plops it on a ThreadPool thread, which has no `SynchronizationContext`. All of your continuations are therefore posted to the ThreadPool. What do you *want* to happen? – canton7 Apr 30 '21 at 11:04
  • @canton7 I want it to run the statements after the await in the same thread as before the await statements, makes sense? – The Piatre Apr 30 '21 at 11:11
  • In that case, you need to provide a way for the await machinery to post a message back to that same thread. ThreadPool threads only listen for new messages which are posted to the ThreadPool, so you'll have to create your own non-ThreadPool thread, and make a message queue that it listens for messages on. Write your own `SynchronizationContext` subclass which knows how to post messages to that queue, and install it on that thread. Then `await` will use that `SynchronizationContext` to post messages to the thread's queue, which the thread can process – canton7 Apr 30 '21 at 11:14
  • @canton7 possible to have a sample code snippet? – The Piatre Apr 30 '21 at 11:23

2 Answers2

5

Your problem can be solved by replacing the Task.Run with the simple Run implementation below:

static async Task Run(Func<Task> action) => await action();

The Task.Run method executes the supplied delegate on a ThreadPool thread, making it illegal to access UI components inside the delegate. On the contrary the Run method above will execute the delegate on the current thread (most probably the UI thread). This means that the asynchronous delegates will not be invoked in parallel (making the name of the ForEachParallelAsync method a bit misleading). This shouldn't be a problem, because most probably your intention is not to parallelize the creation of the tasks, but instead to have multiple tasks concurrently in-flight. In other words the goal is asynchronous concurrency, not parallelism. Parallelism requires many threads, while asynchronous concurrency requires no threads.

Honestly the ForEachParallelAsync implementation used by your application has flaws that are irrelevant to the Task.Run/Run discussion above, and my suggestion is to avoid it. You can find better implementations here, here and here, that are based on the SemaphoreSlim class or the TPL Dataflow library. The problem with the ForEachParallelAsync implementation in your question, which is probably originated from this blog post, is its behavior in case some of the tasks fail. On every exception one worker-task will be killed, and the process will continue with a reduced level of concurrency. If you are unlucky to have exactly maxDegreeOfParallelism - 1 early exceptions, the last standing worker will slowly process all elements alone, until the exceptions are finally surfaced. This problem may not affect you, in case your code handles all errors and doesn't allow them to propagate.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 1
    Your answer helped me to understand my problem clearly, this is exactly what I wanted – The Piatre May 03 '21 at 05:43
  • Good answer but, it does leave the questions, when would you actually want parallelism and, how would you achieve that? – Jodrell Feb 14 '23 at 16:40
  • 1
    Isn't your `Run()` superfluous then? It seems like OP could just write `.Select(async partition => { using ... })`, right? – Good Night Nerd Pride Feb 14 '23 at 16:40
  • @Jodrell I would say that parallelism is needed when the work to be done is at least partially CPU-bound. You can achieve it by keeping the `Task.Run` in the original `ForEachParallelAsync`, or by switching to the .NET 6 [`Parallel.ForEachAsync`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync) API, and finding some other way to prevent the cross-thread-access exceptions. – Theodor Zoulias Feb 14 '23 at 16:55
  • @GoodNightNerdPride yes, the `Run` is most likely superfluous in this case. The `Task.Run` can be replaced directly by an `async` delegate. – Theodor Zoulias Feb 14 '23 at 16:59
  • probably something like https://stackoverflow.com/a/1644254/659190 – Jodrell Feb 14 '23 at 17:04
  • 1
    @Jodrell personally I am not a fan of the `Invoke`/`BeginInvoke`. I prefer to separate the parallelizable thread-agnostic code from the UI code using more structured techniques, like [this](https://stackoverflow.com/questions/53764416/linq-to-sql-in-a-parallel-loop-how-to-prevent-duplicate-insertions/75348172#75348172) one. But yes, your suggestion would also do the trick (with the caveat of potentially flooding the UI message loop with an excessive amount of messages). – Theodor Zoulias Feb 14 '23 at 17:18
0

You need to provide a way for the await machinery to post a message back to that same thread.

ThreadPool threads only listen for new messages which are posted to the ThreadPool, so you'll have to create your own non-ThreadPool thread, and make a message queue that it listens for messages on. Write your own SynchronizationContext subclass which knows how to post messages to that queue, and install it on that thread.

With that in place, await will use that SynchronizationContext to post messages to the thread's queue, which the thread can process.

What follows is a fairly basic implementation of this: it's not the most efficient (it creates some excess allocations), but it should be fairly easy to follow.

The SimpleDispatcher is a class which owns a message queue, and a thread which processes messages from that message queue, one by one. The thread has a custom SynchronizationContext installed on it, which posts messages back to that message queue.

Make sure you dispose the SimpleDispatcher! Otherwise its thread will keep running forever.

The dispatcher's PostAsync method takes a Func<Task>, which it executes on its thread, and it returns a Task which completes when the Task returned by the Func<Task> completes (and which contains any exception). You can use this to post messages to the dispatcher, see the sample in Main at the top.

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
            
#nullable enable

public class Program
{
    public static async Task Main()
    {
        using var dispatcher = SimpleDispatcher.StartNew();
        
        var task1 = dispatcher.SendAsync(async () =>
        {
            Console.WriteLine($"Task 1: Thread {Thread.CurrentThread.ManagedThreadId}");
            await Task.Delay(100);
            Console.WriteLine($"Task 1: Thread {Thread.CurrentThread.ManagedThreadId}");
        });
        
        var task2 = dispatcher.SendAsync(async () =>
        {
            Console.WriteLine($"Task 2: Thread {Thread.CurrentThread.ManagedThreadId}");
            await Task.Delay(100);
            Console.WriteLine($"Task 2: Thread {Thread.CurrentThread.ManagedThreadId}");
        });
        
        await Task.WhenAll(task1, task2);
        
        Console.WriteLine("All tasks complete");
    }
}

public class SimpleDispatcher : IDisposable
{
    private readonly BlockingCollection<QueueItem> queue = new();
    private readonly CancellationTokenSource disposeCts = new();
    private readonly Thread thread;
    
    public SimpleDispatcher()
    {
        thread = new Thread(Run);
    }
    
    public static SimpleDispatcher StartNew()
    {
        var dispatcher = new SimpleDispatcher();
        dispatcher.Start();
        return dispatcher;
    }
    
    public void Start()
    {
        if (thread.IsAlive)
            throw new InvalidOperationException("Already running");
    
        thread.Start();
    }
    
    public void Post(SendOrPostCallback callback, object? state)
    {
        var item = new QueueItem
        {
            Delegate = () =>
            {
                callback(state);
                return Task.CompletedTask;
            },
            Tcs = new(),
        };
    
        queue.Add(item);
        
        // Re-throw any resulting exceptions on the ThreadPool
        async void RethrowExceptions()
        {
            await item.Tcs.Task;
        }
        RethrowExceptions();
    }
    
    public void Send(SendOrPostCallback callback, object? state)
    {
        var item = new QueueItem
        {
            Delegate = () =>
            {
                callback(state);
                return Task.CompletedTask;
            },
            Tcs = new(),
        };
    
        queue.Add(item);
        
        item.Tcs.Task.Unwrap().GetAwaiter().GetResult();
    }
    
    public Task SendAsync(Func<Task> func)
    {
        var item = new QueueItem
        {
            Delegate = func,
            Tcs = new(),
        };
        
        queue.Add(item);
        
        return item.Tcs.Task.Unwrap();
    }
    
    private void Run()
    {
        SynchronizationContext.SetSynchronizationContext(new SimpleDispatcherSynchronizationContext(this));
    
        try
        {
            while (true)
            {
                var item = queue.Take(this.disposeCts.Token);

                try
                {
                    var task = item.Delegate();
                    item.Tcs.SetResult(task);
                }
                catch (Exception e)
                {
                    item.Tcs.SetException(e);
                }
            }
        }
        catch (OperationCanceledException) { }
    }
    
    public void Dispose()
    {
        disposeCts.Cancel();
    }
    
    private struct QueueItem
    {
        public Func<Task> Delegate;
        // When complete, the message has been processed, and the TCS contains the Task returned by Delegate
        public TaskCompletionSource<Task> Tcs;
    }
}

public class SimpleDispatcherSynchronizationContext : SynchronizationContext
{
    private readonly SimpleDispatcher dispatcher;
    
    public SimpleDispatcherSynchronizationContext(SimpleDispatcher dispatcher) => this.dispatcher = dispatcher;

    public override void Send(SendOrPostCallback d, object? state)
    {
        dispatcher.Send(d, state);
    }
    
    public override void Post(SendOrPostCallback d, object? state) 
    {
        dispatcher.Post(d, state);
    }
    
    public override SynchronizationContext CreateCopy() => new SimpleDispatcherSynchronizationContext(dispatcher);
}

See it in action on DotNetFiddle.


All of that said, it's utterly pointless if you create a SimpleDispatcher per partition, and then just run a single thing on it at a time (e.g. a single loop with an await on it). You create a new dedicated thread, and then the thread's just blocked on the queue during each await. You might as well just grap a thread off the ThreadPool (with Task.Run), and use a blocking wait instead of an await: same effect, less complexity.

canton7
  • 37,633
  • 3
  • 64
  • 77