5

In .NET, I'd like to schedule a large number of Tasks, e.g. via Task.Run(...). Some of the tasks are of low importance and should be delayed by the scheduler if higher priority tasks are available for execution.

Is there a way to do this? There seems to be no TaskScheduler in .NET that supports scheduling with any kind of priority.

The tasks are short-running and non-hierarchical. To be clear, this is entirely unrelated to the priority of the thread executing the tasks.

QueuedTaskScheduler from ParallelExtensionsExtras seems to be what I am looking for, but that has been without maintenance for seven years, lacks documentation and most of the links related to it are broken - I'd rather not add a dependence on it.

mafu
  • 31,798
  • 42
  • 154
  • 247
  • Have you considered writing your own SynchronizationContext? See [this link](https://stackoverflow.com/a/39272758/2791540) and [this link](https://stackoverflow.com/a/11084309/2791540). – John Wu Feb 22 '20 at 09:42
  • @JohnWu seems a bit difficult to get it right, but I'll look into that – mafu Feb 22 '20 at 09:53

2 Answers2

3

If you want to keep it simple, queue up an action instead of a task. Since we are queuing up asynchronous calls, the queue type is Func<Task>. Use two queues for the different priorities.

ConcurrentQueue<Func<Task>> _highPriorityQueue;
ConcurrentQueue<Func<Task>> _lowPriorityQueue;

Then create a worker proc to check both queues in priority order.

async Task WorkerProc(CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        Func<Task> action;
        if (_highPriorityQueue.TryDequeue(out action))
        {
            await action();
            continue;
        }
        if (_lowPriorityQueue.TryDequeue(out action))
        {
            await action();
            continue;
        }
        await Task.Yield();
    }
}

Then start up some threads to work the queue:

var source = new CancellationTokenSource();
var threads = Enumerable.Range(0, numberOfThreads).Select( i =>
    Task.Run( () => WorkerProc(source.GetToken()) )
).ToList();

And to add to a queue:

_highPriorityQueue.Enqueue( () => Foo() );
_lowPriorityQueue.Enqueue( () => Bar() );

To shut down:

source.Cancel();
await Task.WhenAll( threads );
John Wu
  • 50,556
  • 8
  • 44
  • 80
  • What I was doing is quite close to this :) While this approach works well for me, it does not answer the more general question, so I'll not mark it as solution yet if you don't mind. – mafu Feb 22 '20 at 12:37
  • 1
    `while (true) { await Task.Yield(); }` look like a tight loop to me. Pretty horrible waste of CPU resources. – Theodor Zoulias Feb 24 '20 at 15:03
  • For reference, that can be fixed with BlockingCollection, which also allows waiting for an item in several queues. – mafu Dec 14 '22 at 21:06
1

@John Wu gave a nice and simple answer for the case, when there are only 2 priorities.

When there is need for more granular priorities, and possibly priority changes (like boosting a priority), an own priority based queue can be implemented. For this some sorted or sortable list can be used, for example SortedList<TKey,TValue>.

For the keys, I would suggest constructing something from the priority (as primary) and the scheduling time (as secondary). Be careful to make the keys unique.

Be very careful to make adding and removing tasks from the list thread safe (you will need to implement thread safety yourself, if the list does not provide it).

Finally some iteration: the WorkerProc will look very similar, there will be only one task list (instead of the taks queue), adding and removing tasks instead of enqueing and dequeuing. The tasks will sort themselves into the right position on adding by construction of the key.

protix
  • 99
  • 2