2

I am trying to create a TaskScheduler that limits the number of threads that can be run at the same time. I'm using this example. The problem is that I'm seeing some behavior I don't understand.

If I create my class like the example:

LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(5);
TaskFactory factory = new TaskFactory(lcts);

And then run it like this:

foreach (int i = 0; i < 10; ++i) 
{
    factory.StartNew(() => DoWork());
}

And do work looks like this:

private async Task DoWork()
{
    // Do some stuff here like
    StaticIntValueWorkOne++;

    // And then more stuff that is async here
    int someValue = await DoAdditionalWorkAsync();
    Thread.Sleep(10000);
    StaticIntValueWorkTwo++;
}

What I see is that StaticIntValueWorkOne is incremented immediately 10 times, while StaticIntValueWorkTwo is only incremented once. Then after 10s I see StaticIntValueWorkTwo incremented, and then every 10s after that. What I'm not getting is what the await on DoAdditionalWorkAsync() is doing with the concurrency. I was thinking that I would see StaticIntValueWorkOne incremented once, and then StaticIntValueWorkTwo would increment once. What am I missing? Do I just need an await on the factor.StartNew()?

Nicros
  • 5,031
  • 12
  • 57
  • 101
  • 2
    the `++` operator is not thread safe, swap it out with `Interlocked.Increment(ref StaticIntValueWorkOne)` and `Interlocked.Increment(ref StaticIntValueWorkTwo)` then let us know if the problem persists. The reason the example you are following is using it is the fact that the `++` is inside a `lock` statement which prevents multiple threads from modifying the value at the same time. – Scott Chamberlain Mar 20 '15 at 05:28
  • Ah I didn't know that- the problem persists though... – Nicros Mar 20 '15 at 05:30
  • Related: [How to run a Task on a custom TaskScheduler using await?](https://stackoverflow.com/questions/15428604/how-to-run-a-task-on-a-custom-taskscheduler-using-await) – Theodor Zoulias Aug 22 '22 at 06:55

2 Answers2

5

I am trying to create a TaskScheduler that limits the number of threads that can be run at the same time.

You might want to skip straight to the answer. ;)

var scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 5)
    .ConcurrentScheduler;

What I'm not getting is what the await on DoAdditionalWorkAsync() is doing with the concurrency.

Task schedulers only apply to executing code. When an async method executes on a task scheduler, you can think of it as being broken up into multiple tasks, with a break at each await point. By default, after the await, the async method will re-enter its task scheduler. The async method is not "in" the task scheduler while it is awaiting.

So, the scheduling limitation (5 at a time) simply doesn't apply while the method is awaiting. So, in your DoWork, the method will first increment the variable, then yield to the task scheduler. While yielded, it doesn't "count" towards your concurrency limitation. Later, when that method resumes, it will block the thread (which does "count") and increment the second variable.

With this code:

private static void Main(string[] args)
{
    var scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 5)
        .ConcurrentScheduler;
    TaskFactory factory = new TaskFactory(scheduler);

    for (int i = 0; i < 10; ++i)
    {
        factory.StartNew(() => DoWork());
    }

    Console.ReadKey();
}

private static int StaticIntValueWorkOne, StaticIntValueWorkTwo;

private static async Task DoWork()
{
    // Do some stuff here like
    Console.WriteLine(DateTime.UtcNow + " StaticIntValueWorkOne" + Interlocked.Increment(ref StaticIntValueWorkOne));

    // And then more stuff that is async here
    await Task.Yield();
    Thread.Sleep(10000);
    Console.WriteLine(DateTime.UtcNow + " StaticIntValueWorkTwo" + Interlocked.Increment(ref StaticIntValueWorkTwo));
}

I get this (expected) output:

3/20/2015 11:01:53 AM StaticIntValueWorkOne1
3/20/2015 11:01:53 AM StaticIntValueWorkOne5
3/20/2015 11:01:53 AM StaticIntValueWorkOne4
3/20/2015 11:01:53 AM StaticIntValueWorkOne2
3/20/2015 11:01:53 AM StaticIntValueWorkOne3
3/20/2015 11:01:53 AM StaticIntValueWorkOne6
3/20/2015 11:01:53 AM StaticIntValueWorkOne9
3/20/2015 11:01:53 AM StaticIntValueWorkOne10
3/20/2015 11:01:53 AM StaticIntValueWorkOne7
3/20/2015 11:01:53 AM StaticIntValueWorkOne8
3/20/2015 11:02:03 AM StaticIntValueWorkTwo1
3/20/2015 11:02:03 AM StaticIntValueWorkTwo3
3/20/2015 11:02:03 AM StaticIntValueWorkTwo2
3/20/2015 11:02:03 AM StaticIntValueWorkTwo4
3/20/2015 11:02:03 AM StaticIntValueWorkTwo5
3/20/2015 11:02:13 AM StaticIntValueWorkTwo6
3/20/2015 11:02:13 AM StaticIntValueWorkTwo7
3/20/2015 11:02:13 AM StaticIntValueWorkTwo8
3/20/2015 11:02:13 AM StaticIntValueWorkTwo9
3/20/2015 11:02:13 AM StaticIntValueWorkTwo10

If you want to limit the concurrency of asynchronous code, look into SemaphoreSlim or TPL Dataflow.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
3

TaskFactory.StartNew is not async aware so when the DoWork method reaches the first await method and it returns, it would consider the method complete.

Once the await completes it will run on whatever SynchronizationContextwas captured.

With async-await you can do this by implementing your own syncrhonization context as shown here. Or alternatively you can implement this type of concurrency with a SemaphoreSlim. Something like this:

private SemaphoreSlim semaphore = new SemaphoreSlim(5,5);
private static int StaticIntValueWorkOne;
private static int StaticIntValueWorkTwo;

private async Task DoWork()
{
    try
    {
        await semaphore.WaitAsync();

        // Do some stuff here like
        StaticIntValueWorkOne++;
        ("StaticIntValueWorkOne " + StaticIntValueWorkOne).Dump();

        // And then more stuff that is async here
        int someValue = await DoAdditionalWorkAsync();
        await Task.Delay(10000);

        StaticIntValueWorkTwo++;
        ("StaticIntValueWorkTwo " + StaticIntValueWorkTwo).Dump();
    }
    finally
    {
        semaphore.Release();
    }
}

private async Task<int> DoAdditionalWorkAsync()
{   
   await Task.Delay(5000);
   return 0;
}

void Main()
{
    for (int i = 0; i < 10; ++i) 
    {
        DoWork();
    }
}

Output:

StaticIntValueWorkOne 1
StaticIntValueWorkOne 2
StaticIntValueWorkOne 3
StaticIntValueWorkOne 4
StaticIntValueWorkOne 5
StaticIntValueWorkTwo 1
StaticIntValueWorkTwo 2
StaticIntValueWorkTwo 3
StaticIntValueWorkOne 6
StaticIntValueWorkOne 7
StaticIntValueWorkTwo 5
StaticIntValueWorkTwo 4
StaticIntValueWorkOne 8
StaticIntValueWorkOne 10
StaticIntValueWorkOne 9
StaticIntValueWorkTwo 7
StaticIntValueWorkTwo 8
StaticIntValueWorkTwo 9
StaticIntValueWorkTwo 6
StaticIntValueWorkTwo 10
NeddySpaghetti
  • 13,187
  • 5
  • 32
  • 61