0

I need to fire off some Tasks to run, but I want them to be on SPECIFIC (the same) threads, every time they run. I don't know how to get that to happen except to perhaps instantiate a SingleThreadTaskScheduler (of my own creation). I am getting frames from a capture source, and I want to split off processing work onto parallel threads, but I need the parallel threads to operate on the frames in order. And for that to happen, they have to be the same thread as I fed last time, per processing pipeline. For instance, I have parallel processing pipelines A, B, and C. I need to feed AB&C each time I get a frame. They operate in parallel.

I saw another example on StackOverflow about how to create a single thread task scheduler, but it doesn't explain how I would be allowed to await the result and keep chugging in my current thread.

Here's the function I sort of need to execute. Task.Run() needs to be replaced by firing off x.LongRunningAsync() on a specific thread, not just some random one from the thread pool! That is, one specific thread PER item in this.Steps. The same thread needs to be called per call of DoParallelStuff. DoParallelStuff is called many times. The caller of this function wants to go off and do other stuff while these things are executing in parallel.

public async Task<bool> DoParallelStuff()
{
    var tasks = this.Steps.Select(x => Task.Run(() => x.LongRunningAsync()));
    var results = await Task.WhenAll(tasks);
    var completed = !results.Any() || results.All(x => x == true);
    this.OnCompleted();
    return completed;
}
eric frazer
  • 1,492
  • 1
  • 12
  • 19
  • See duplicate for literal answer to the question. That said, it's not clear why you believe you need to execute code _on a specific thread_. In most cases, using [`AsyncLocal`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.asynclocal-1) will suffice, by providing data storage that has affinity for your async _context_, independent of whatever thread is being used. – Peter Duniho Jul 05 '21 at 00:42
  • Can you include the code of the `LongRunningAsync` method? I am asking because of the `Async` suffix, that indicates an asynchronous method. And asynchronous operations are running normally on [zero threads](https://blog.stephencleary.com/2013/11/there-is-no-thread.html). – Theodor Zoulias Jul 05 '21 at 06:30
  • After reading the latest edit it seems to me that the work you are doing is not naturally asynchronous, but instead you are using asynchronous techniques to process a CPU-bound workload. It's not clear to me what exactly is the workflow and the constraints, but chances are that there are better ways to do what you are trying to do. Like the the [TPL Dataflow](https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) library or the [PLINQ](https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/introduction-to-plinq) library for example. – Theodor Zoulias Jul 07 '21 at 11:02
  • task parallel library doesn't seem to address my need to process the incoming frames parallelly, in ORDER. if I fire off random Task.Run()s, either by myself or through TPL, they get processed out of order. But you're right, they are cpu-bound operations. – eric frazer Jul 08 '21 at 17:19

2 Answers2

2

This problem can be solved with a custom TaskScheduler, but can also be solved with a custom SynchronizationContext. For example you could install Stephen Cleary's Nito.AsyncEx.Context package, and do this:

var tasks = this.Steps.Select(step => Task.Factory.StartNew(() =>
{
    AsyncContext.Run(async () =>
    {
        await step.LongRunningAsync();
    });
}, default, TaskCreationOptions.LongRunning, TaskScheduler.Default));

A dedicated thread is going to be launched for each step in this.Steps. Most probably this thread is going to be blocked for most of the time, while the LongRunningAsync is doing asynchronous stuff internally, but the continuations between the await points will be invoked on this thread.

It is important that all the await points inside the LongRunningAsync method are capturing the SynchronizationContext. A single ConfigureAwait(false) will cause the single-thread policy to fail.

And here is how a SingleThreadTaskScheduler can be used instead:

var tasks = this.Steps.Select(step => Task.Factory.StartNew(async () =>
{
    await step.LongRunningAsync();
}, default, TaskCreationOptions.None, new SingleThreadTaskScheduler()).Unwrap());

Pay attention to the Unwrap call, it's very important.

The previous note, regarding the required absence of ConfigureAwait(false) at the await points, applies here too.

An (untested) implementation of the SingleThreadTaskScheduler class can be found here.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
0

alright, I did some work over the weekend and figured it out. I used the SingleThreadTaskScheduler mentioned before, and, per pipeline (thread), I create a SingleThreadTaskScheduler. I modified this class to take a Func in Schedule(blah), and I passed in an action (what else would I call it), and I also modified the func to pass back a Task. Now, when I call Schedule(), I can use the returned Task to wait upon it, if I want, or I can ignore the Task and simply let it complete on the background thread. Either way, now I have full control. What I DON'T and can't have, however, is the ability for ANY call to 'await' within the Func<> that I send to the SingleThreadTaskScheduler. I don't know why it goes wrong, but if at any point, I use an 'await' in the code while running the SingleThreadTaskScheduler, it hangs. Dunno why, but don't care at this point, it is all running.

eric frazer
  • 1,492
  • 1
  • 12
  • 19