After carefully considering the other answers, I decided for my uses it was easier to create a custom QueuedTaskScheduler
given I don't need to worry about async tasks or IO completion (although that has given me something to think about).
Firstly when we grab work from the child work pools, we add a semaphore based lock, inside FindNextTask_NeedsLock
:
var items = queueForTargetTask._workItems;
if (items.Count > 0
&& queueForTargetTask.TryLock() /* This is added */)
{
targetTask = items.Dequeue();
For the dedicated thread version, inside ThreadBasedDispatchLoop
:
// ... and if we found one, run it
if (targetTask != null)
{
queueForTargetTask.ExecuteTask(targetTask);
queueForTargetTask.Release();
}
For the task scheduler version, inside ProcessPrioritizedAndBatchedTasks
:
// Now if we finally have a task, run it. If the task
// was associated with one of the round-robin schedulers, we need to use it
// as a thunk to execute its task.
if (targetTask != null)
{
if (queueForTargetTask != null)
{
queueForTargetTask.ExecuteTask(targetTask);
queueForTargetTask.Release();
}
else
{
TryExecuteTask(targetTask);
}
}
Where we create the new child queues:
/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
/// <returns>The newly created and activated queue at priority 0 and max concurrency of 1.</returns>
public TaskScheduler ActivateNewQueue() { return ActivateNewQueue(0, 1); }
/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
/// <param name="priority">The priority level for the new queue.</param>
/// <returns>The newly created and activated queue at the specified priority.</returns>
public TaskScheduler ActivateNewQueue(int priority, int maxConcurrency)
{
// Create the queue
var createdQueue = new QueuedTaskSchedulerQueue(priority, maxConcurrency, this);
...
}
Finally, inside the nested QueuedTaskSchedulerQueue
:
// This is added.
private readonly int _maxConcurrency;
private readonly Semaphore _semaphore;
internal bool TryLock()
{
return _semaphore.WaitOne(0);
}
internal void Release()
{
_semaphore.Release();
_pool.NotifyNewWorkItem();
}
/// <summary>Initializes the queue.</summary>
/// <param name="priority">The priority associated with this queue.</param>
/// <param name="maxConcurrency">Max concurrency for this scheduler.</param>
/// <param name="pool">The scheduler with which this queue is associated.</param>
internal QueuedTaskSchedulerQueue(int priority, int maxConcurrency, QueuedTaskScheduler pool)
{
_priority = priority;
_pool = pool;
_workItems = new Queue<Task>();
// This is added.
_maxConcurrency = maxConcurrency;
_semaphore = new Semaphore(_maxConcurrency, _maxConcurrency);
}
I hope this might be useful for someone trying to do the same as me and interleave unordered tasks with ordered tasks on a single, easy to use scheduler (that can use the default threadpool, or any other scheduler).
=== UPDATE ===
Inspired by Stephen Cleary, I ended up using:
private static readonly Lazy<TaskScheduler> Scheduler = new Lazy<TaskScheduler>(
() => new WorkStealingTaskScheduler(16));
public static TaskScheduler Default
{
get
{
return Scheduler.Value;
}
}
public static TaskScheduler CreateNewOrderedTaskScheduler()
{
return new QueuedTaskScheduler(Default, 1);
}