3

I am looking for a TaskScheduler that:

  1. Allows me to define a number of dedicated threads (e.g. 8) - a standard LimitedConcurrencyLevelTaskScheduler (uses threadpool threads) or WorkStealingTaskScheduler do this.
  2. Allows me to create sub-TaskSchedulers that are fully ordered but schedules the tasks on the dedicated threads of the parent scheduler.

At the moment we use TaskScheduler.Default for the general pool (at the mercy of the threadpool growth algorithm etc) and new OrderedTaskScheduler() whenever we want to order tasks. I want to keep this behavior but limit both requirements to my own pool of dedicated threads.

QueuedTaskScheduler seems to get pretty close. I thought the QueuedTaskScheduler.ActivateNewQueue() method, which returns a child TaskScheduler would execute tasks IN ORDER on the pool of workers from the parent but that doesn't seem to be the case. The child TaskSchedulers seem to have the same level of parallelization as the parent.

I don't necessarily want the child taskscheduler tasks to be prioritised over the parent taskscheduler tasks (although it might be a nice feature in the future).

I have seen a related question here: Limited concurrency level task scheduler (with task priority) handling wrapped tasks but my requirements do not need to handle async tasks (all my enqueued tasks are completely synchronous from start to end, with no continuations).

Community
  • 1
  • 1
jamespconnor
  • 1,382
  • 14
  • 29
  • How many sub task schedulers do you need? I understand your tasks have dependencies which is why you want to (partially) order them. You could do this with ContinueWith chains. – usr Nov 04 '14 at 12:22
  • @usr We don't have an upper bound. Probably quite a few. We have a generic command dispatcher layer - certain command types shouldn't ever be run in parallel. E.g. Get and Set command for a data set A should be queued so we don't get sheared reads if both run in parallel. Whilst these are queued up and executing in series, a Get command for data set B can run in parallel on the parent scheduler. EDIT: Ideally we don't want to use ContinueWith chains due to the generic (& fire and forget) nature of the dispatch layer. – jamespconnor Nov 04 '14 at 13:08
  • 3
    You just need to keep track of the latest task in any given chain. When a new one comes in you set up the next continuation off of that task and store the new task. You drop the old one.; Alternative solution: Have one SemaphoreSlim per chain and use `await sem.WaitAsync()` to manually control the DOP very flexibly.; I don't think schedulers are the right abstraction. Consider using ordinary task combinators and coordination primitives. – usr Nov 04 '14 at 13:15
  • Thanks for your advice. I will check out SemaphoreSlim, that might do it. – jamespconnor Nov 04 '14 at 13:36
  • Curious: you trying to write [tsp](http://manpages.ubuntu.com/manpages/saucy/man1/tsp.1.html) for .net? – hometoast Nov 04 '14 at 18:38

3 Answers3

9

I assume by "fully ordered" you also mean "one at a time".

In that case, I believe there's a built-in solution that should do quite well: ConcurrentExclusiveSchedulerPair.

Your "parent" scheduler would be a concurrent scheduler:

TaskScheduler _parent = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 8)
     .ConcurrentScheduler;

And the "child" schedulers would be an exclusive scheduler that uses the concurrent scheduler underneath:

var myScheduler = new ConcurrentExclusiveSchedulerPair(_parent).ExclusiveScheduler;
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • I've just seen this. Interesting usage, wouldn't the ordered work I put on the `ExclusiveScheduler` gain a full lock on the parent scheduler and block unordered work that I push onto the `ConcurrentScheduler`? – jamespconnor Nov 04 '14 at 18:42
  • Answering my own comment - Ah no, I see you've created a whole new pair for each child. Interesting. I will check it out, thankyou. Looks like a far better way than molding my own `QueuedTaskScheduler` – jamespconnor Nov 04 '14 at 18:44
2

After carefully considering the other answers, I decided for my uses it was easier to create a custom QueuedTaskScheduler given I don't need to worry about async tasks or IO completion (although that has given me something to think about).

Firstly when we grab work from the child work pools, we add a semaphore based lock, inside FindNextTask_NeedsLock:

var items = queueForTargetTask._workItems;
if (items.Count > 0 
    && queueForTargetTask.TryLock() /* This is added */)
{
    targetTask = items.Dequeue();

For the dedicated thread version, inside ThreadBasedDispatchLoop:

// ... and if we found one, run it
if (targetTask != null)
{
    queueForTargetTask.ExecuteTask(targetTask);
    queueForTargetTask.Release();
}

For the task scheduler version, inside ProcessPrioritizedAndBatchedTasks:

// Now if we finally have a task, run it.  If the task
// was associated with one of the round-robin schedulers, we need to use it
// as a thunk to execute its task.
if (targetTask != null)
{
    if (queueForTargetTask != null)
    {
        queueForTargetTask.ExecuteTask(targetTask);
        queueForTargetTask.Release();
    }
    else
    {
        TryExecuteTask(targetTask);
    }
}

Where we create the new child queues:

/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
/// <returns>The newly created and activated queue at priority 0 and max concurrency of 1.</returns>
public TaskScheduler ActivateNewQueue() { return ActivateNewQueue(0, 1); }

/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
/// <param name="priority">The priority level for the new queue.</param>
/// <returns>The newly created and activated queue at the specified priority.</returns>
public TaskScheduler ActivateNewQueue(int priority, int maxConcurrency)
{
    // Create the queue
    var createdQueue = new QueuedTaskSchedulerQueue(priority, maxConcurrency, this);

    ...
}

Finally, inside the nested QueuedTaskSchedulerQueue:

// This is added.
private readonly int _maxConcurrency;
private readonly Semaphore _semaphore;

internal bool TryLock()
{
    return _semaphore.WaitOne(0);
}

internal void Release()
{
    _semaphore.Release();
    _pool.NotifyNewWorkItem();
}

/// <summary>Initializes the queue.</summary>
/// <param name="priority">The priority associated with this queue.</param>
/// <param name="maxConcurrency">Max concurrency for this scheduler.</param>
/// <param name="pool">The scheduler with which this queue is associated.</param>
internal QueuedTaskSchedulerQueue(int priority, int maxConcurrency, QueuedTaskScheduler pool)
{
    _priority = priority;
    _pool = pool;
    _workItems = new Queue<Task>();

    // This is added.
    _maxConcurrency = maxConcurrency;
    _semaphore = new Semaphore(_maxConcurrency, _maxConcurrency);
}

I hope this might be useful for someone trying to do the same as me and interleave unordered tasks with ordered tasks on a single, easy to use scheduler (that can use the default threadpool, or any other scheduler).

=== UPDATE ===

Inspired by Stephen Cleary, I ended up using:

private static readonly Lazy<TaskScheduler> Scheduler = new Lazy<TaskScheduler>(
    () => new WorkStealingTaskScheduler(16));

public static TaskScheduler Default
{
    get
    {
        return Scheduler.Value;
    }
}

public static TaskScheduler CreateNewOrderedTaskScheduler()
{
    return new QueuedTaskScheduler(Default, 1);
}
jamespconnor
  • 1,382
  • 14
  • 29
1

I understand your tasks have dependencies which is why you want to (partially) order them. You could do this with ContinueWith chains. You just need to keep track of the latest task in any given chain. When a new one comes in you set up the next continuation off of that task and store the new task. You drop the old one.

Alternative solution: Have one SemaphoreSlim per chain and use await sem.WaitAsync() to manually control the DOP very flexibly. Note, that async-waiting on a semaphore does not block any thread. It causes just a little memory usage. No OS resource at all is being used. You can have extremely many semaphores in use.

I don't think schedulers are the right abstraction. Schedulers are for CPU-based work. The other coordination tools can work with any Task including async IO. Consider preferring ordinary task combinators and coordination primitives.

usr
  • 168,620
  • 35
  • 240
  • 369
  • Thanks @usr for putting your comments into a formal answer. I am accepting this answer as ultimately I think it's the correct direction for what I need. In particular, I will probably be moving more towards Async web calls using IO completion ports which won't work with my current solution posted as another answer. – jamespconnor Nov 04 '14 at 18:39