2

I am using C#, TPL. I have a class that contains some asynchronous methods that execute some sub-tasks, for simplicity I will only consider one method and one sub-task:

class Test1
{
    private Task SubTask() => Task.Delay(1000);

    public async Task FullTask()
    {
        Console.WriteLine("Task Start");
        await SubTask();
        Console.WriteLine("Task Middle");
        await SubTask();
        Console.WriteLine("Task End");
    }

    static async Task Main()
    {
        Test1 Test = new Test1();
        Task Task1 = Test.FullTask();
        Task Task2 = Test.FullTask();
        await Task.WhenAll(Task1, Task2);
    }
}

Upon execution the following (expected) result gets printed in the console:

Task Start
Task Start
Task Middle
Task Middle
Task End
Task End

The problem is that each call to FullTask must run after the previous has been completed, if multiple calls to FullTask happen simultaneously, they must be processed one by one. My first idea was to use the ContinueWith method:

class Test2
{
    private Task LastTask = Task.CompletedTask;

    private Task SubTask() => Task.Delay(1000);

    public Task FullTask()
    {
        lock(LastTask)
        {
            return LastTask = LastTask.ContinueWith(_ =>
            {
                Console.WriteLine("Task Start");
                SubTask().Wait();
                Console.WriteLine("Task Middle");
                SubTask().Wait();
                Console.WriteLine("Task End");
            });
        }
    }

    static async Task Main()
    {
        Test2 Test = new Test2();
        Task Task1 = Test.FullTask();
        Task Task2 = Test.FullTask();
        await Task.WhenAll(Task1, Task2);
    }
}

Again, upon execution the following (expected) result gets printed in the console:

Task Start
Task Middle
Task End
Task Start
Task Middle
Task End

The problem is that the lambda inside the FullTask blocks the thread because it uses SubTask().Wait(); and not await SubTask();. If there exist many instances of the Test2 class each executing the FullTask method, thread pool starvation will occur. Changing ether FullTask or the lambda (or both) to async does not solve the issue:

class Test3
{
    private Task LastTask = Task.CompletedTask;

    private Task SubTask() => Task.Delay(1000);

    public Task FullTask()
    {
        lock(LastTask)
        {
            return LastTask = LastTask.ContinueWith(async _ =>
            {
                Console.WriteLine("Task Start");
                await SubTask();
                Console.WriteLine("Task Middle");
                await SubTask();
                Console.WriteLine("Task End");
            });
        }
    }

    static async Task Main()
    {
        Test3 Test = new Test3();
        Task Task1 = Test.FullTask();
        Task Task2 = Test.FullTask();
        await Task.WhenAll(Task1, Task2);
    }
}

The ContinueWith returns a Task<Task>, the outer Task being the scheduled task to be executed after the LastTask. That task will end at the first await and it will return the inner (compiler generated) Task that will end at the end of the lambda. Here, I am intersected in the inner task which does not get created before the outer task reaches the first await. Thus approach does not work.

What I want is a non blocking approach that produces the same results as Test. Any ideas?

Coconut9
  • 123
  • 2
  • 7
  • 2
    You can use [SemaphoreSlim](https://learn.microsoft.com/en-us/dotnet/standard/threading/semaphore-and-semaphoreslim) for that. It provide lock mechanism with async api. – Kalten Jun 10 '21 at 11:58
  • @Kalten doesn't that also block-spinwait? – Coconut9 Jun 10 '21 at 12:02
  • What do you refer by `block-spinwait` ? It will act more or less like a lock statement – Kalten Jun 10 '21 at 12:05
  • 1
    Is this helpful? [How to combine asynchrony with locking?](https://stackoverflow.com/questions/44269412/how-to-combine-asynchrony-with-locking) – Theodor Zoulias Jun 10 '21 at 12:26
  • 1
    @Coconut9 what are you trying to do, and what do you mean by `atomically`? `atomic` doesn't mean that only one thread can execute at a time, although that seems to be what you try to do. `await` replaces `ContinueWith`. You could simply add a `lock` around the *original* method but that would block the caller thread. You can use a SemaphoreSlim and await asynchronously using [SemaphoreSlim.WaitAsync](https://learn.microsoft.com/en-us/dotnet/api/system.threading.semaphoreslim.waitasync?view=net-5.0#System_Threading_SemaphoreSlim_WaitAsync) – Panagiotis Kanavos Jun 10 '21 at 12:40
  • 2
    For more advanced scenarios, especially when you need to pass data to the method, you can use an ActionBlock or a Channel. With eg `var block=new ActionBlock(msg=>FullTask(msg));` the block will process all requests using a worker task, in the order they're received – Panagiotis Kanavos Jun 10 '21 at 12:46
  • @Panagiotis Kanavos, I think that I will use both methods, in different situations. Thanks. – Coconut9 Jun 10 '21 at 13:13
  • Related: [Task sequencing and re-entracy](https://stackoverflow.com/questions/21424084/task-sequencing-and-re-entracy) – Theodor Zoulias Jun 10 '21 at 23:11

1 Answers1

6

If you want an iteration of the ContinueWith approach (using the more modern await), something like this should work:

private readonly object _lastTaskMutex = new object();
public Task FullTask()
{
  lock (_lastTaskMutex)
    return LastTask = RunAfterAsync(LastTask);

  async Task RunAfterAsync(Task lastTask)
  {
    try
    {
      await lastTask;
    }
    catch { }
    Console.WriteLine("Task Start");
    await SubTask();
    Console.WriteLine("Task Middle");
    await SubTask();
    Console.WriteLine("Task End");
  }
}

If you just care about mutual exclusion and the exact order doesn't matter, then a SemaphoreSlim would work:

private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
public async Task FullTask()
{
  await _mutex.WaitAsync();
  try
  {
    Console.WriteLine("Task Start");
    await SubTask();
    Console.WriteLine("Task Middle");
    await SubTask();
    Console.WriteLine("Task End");
  }
  finally
  {
    _mutex.Release();
  }
}

Or, if what you really want is a strict-FIFO queue of operations, then a Channel or ActionBlock as suggested by Patagonias is appropriate; in this case, you usually want to pass a TaskCompletionSource<T> to indicate when individual requests have completed:

private readonly ActionBlock<TaskCompletionSource<object>> _block = new ActionBlock<TaskCompletionSource<object>>(async tcs =>
{
  try
  {
    Console.WriteLine("Task Start");
    await SubTask();
    Console.WriteLine("Task Middle");
    await SubTask();
    Console.WriteLine("Task End");
    tcs.TrySetResult(null);
  }
  catch (Exception ex)
  {
    tcs.TrySetException(ex);
  }
});

public Task FullTask()
{
  var tcs = new TaskCompletionSource<object>();
  _block.Post(tcs);
  return tcs.Task;
}
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • In the first code block, with `lock` I think it will not work as you don't await the task. It will return (almost) immediately and go outside lock block before executing lastTask. The sample will work because lastTask is an already completed task. But with a real one, it wont – Kalten Jun 10 '21 at 18:52
  • @Kalten: The `lock` is just protecting the `LastTask` variable; it's not providing the mutual exclusion of the logic. – Stephen Cleary Jun 10 '21 at 19:02
  • So nothing prevent `Task Start` to be printed before any other `Task End`, right? You seem to be a lot more expert on this topic than me. So I try to understand what I don't see :) – Kalten Jun 10 '21 at 19:12
  • 1
    @Kalten: Walk through it like you were the compiler. First, take the lock. Then, read the value of `LastTask` and pass it to the `RunAfterAsync` method. That method `await`s that task; if it isn't complete, `RunAfterAsync` returns an incomplete task. That task is then assigned to `LastTask`, and then the lock is released. The `await lastTask` inside `RunAfterAsync` ensures each of them run one at a time. – Stephen Cleary Jun 10 '21 at 19:53