35

I have a mostly IO-bound continuous task (a background spellchecker talking to a spellcheck server). Sometimes, this task needs to be put on hold and resumed later, depending on the user activity.

While suspend/resume is essentially what async/await does, I've found little information on how to implement the actual pause/play logic for an asynchronous method. Is there a recommended pattern for this?

I've also looked at using Stephen Toub's AsyncManualResetEvent for this, but thought it might be an overkill.

noseratio
  • 59,932
  • 34
  • 208
  • 486
  • 6
    While I find your question interesting, I think the question itself is wrong. First, I think that a spellchecker should not be a continuous task, but should be run on-demand, when the contents of the document change, and only on those contents which changed. Then, I can't see why you're striving to recycle the same task. Since tasks are lightweight, you should be able to terminate the spellchecker task using its `CancellationToken` and then create a new task when you need to restart it. – Giulio Franco Oct 27 '13 at 01:04
  • @GiulioFranco, broadly speaking, the editor control is a plug-in to the client app (other editors could be plugged in instead). On the high level it's still the responsibility of the editor to organize the spellcheck process. I prefer to use `async/await`-based model instead of event-based model, most of all because it's simpler to implement. So, there's still an async background task on the higher level, detecting document changes and starting another spellcheck pass. This is how it's implemented, in fact. Thus, the original question remains valid. – noseratio Oct 27 '13 at 01:36
  • As I already mentioned, can't you just create and destroy mutiple tasks? The task is *not* the object that is performing the computation. You can keep your global state in a `Spellchecker` object, with a method `ResumeSpellCheck` that creates and starts a new task to run a private `SpellCheckAsync` method of the object, and a `StopSpellCheck` method, which cancels the running task, if any. You can use a `CancellationTask` stored in a instance field (since at most one spellcheck task is running at any time for any `Spellchecker` instance) – Giulio Franco Oct 27 '13 at 01:38
  • 2
    @GiulioFranco, the actual code is much more complex. There's a lot more going on in place of `await Task.Delay(100, token)`, like the actual spellcheck pass (which *is* a computational task done locally via `Task.Run`). Then possibly there's a request to the spellcheck server for the words not found locally, or a remote dictionary update. Then there's another task detecting any document changes (also done via `Task.Run`). The whole process has its own state. **Stopping it is different from pausing.** E.g., the misspelled underlined words remain when paused. Makes sense? – noseratio Oct 27 '13 at 02:12
  • And from the client app prospective, it's just Start/Stop/Pause/Resume API. That's the context of my question. Perhaps I shouldn't have mentioned the spellcheck at all. – noseratio Oct 27 '13 at 02:17
  • "Stopping it is different from pausing. E.g., the misspelled underlined words remain when paused. Makes sense?" What remains depends on what you store into your variables. In the worst case, if you implement it as an instruction interpreter, you can stop the task, and then create a new task that will resume the computation from the exact last instruction that was processed by the previous task, before it was stopped. – Giulio Franco Oct 27 '13 at 02:29
  • You can even store pending HTTP requests data, and task number 2 can receive the answer to the request that was made by task number 1 before it was destroyed. What I'm proposing **is** a start/stop interface, with no concerns about what the task does. It's you that need to implement the task in such a way that it can be stopped and restarted afterwards. – Giulio Franco Oct 27 '13 at 02:30
  • 1
    @GiulioFranco: *In the worst case, if you implement it as an instruction interpreter, you can stop the task, and then create a new task that will resume the computation from the exact last instruction that was processed by the previous task, before it was stopped*. **True, and you're talking about the state machine here.** That's exactly what `async/await` is for, and I just want to take an advantage of it, to avoid chopping my logic into a set of callbacks. Check this out: [Pause and Play with Await](http://msdn.microsoft.com/en-us/magazine/hh456403.aspx). – noseratio Oct 27 '13 at 02:34
  • it seems to me that what you're getting to is far more complex than the solution I proposed, with few if no advantage on the linearity of the actual task code. But maybe I'm just overlooking the spellcheck. – Giulio Franco Oct 27 '13 at 02:47

5 Answers5

11

AsyncManualResetEvent is exactly what you need, considering how messy your current code is. But a slightly better solution would be to use another approach from Stephen Toub: PauseToken. It works similarly to AsyncManualResetEvent, except its interface is made specifically for this purpose.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
svick
  • 236,525
  • 50
  • 385
  • 514
  • That linked `PauseToken` code doesn't even work properly. Take out the `await Task.Delay(100)` in his example and turn the `for` loop into an endless one and it completely stops working. It's not exactly a testament to code quality if even the actual authors of the TPL don't know how to use it properly. – user3700562 Jul 19 '23 at 15:45
11

Updated for 2019, I've recently had a chance to revisit this code, below is complete example as a console app (warning: PauseTokenSource needs good unit testing).

Note, in my case, the requirement was that when the consumer-side code (which requested the pause) would continue, the producer-side code should have already reached the paused state. Thus, by the time the UI is ready to reflect the paused state, all background activity is expected to have been already paused.

using System;
using System.Threading.Tasks;
using System.Threading;

namespace Console_19613444
{
    class Program
    {
        // PauseTokenSource
        public class PauseTokenSource
        {
            bool _paused = false;
            bool _pauseRequested = false;

            TaskCompletionSource<bool> _resumeRequestTcs;
            TaskCompletionSource<bool> _pauseConfirmationTcs;

            readonly SemaphoreSlim _stateAsyncLock = new SemaphoreSlim(1);
            readonly SemaphoreSlim _pauseRequestAsyncLock = new SemaphoreSlim(1);

            public PauseToken Token { get { return new PauseToken(this); } }

            public async Task<bool> IsPaused(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    return _paused;
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            public async Task ResumeAsync(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    if (!_paused)
                    {
                        return;
                    }

                    await _pauseRequestAsyncLock.WaitAsync(token);
                    try
                    {
                        var resumeRequestTcs = _resumeRequestTcs;
                        _paused = false;
                        _pauseRequested = false;
                        _resumeRequestTcs = null;
                        _pauseConfirmationTcs = null;
                        resumeRequestTcs.TrySetResult(true);
                    }
                    finally
                    {
                        _pauseRequestAsyncLock.Release();
                    }
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            public async Task PauseAsync(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    if (_paused)
                    {
                        return;
                    }

                    Task pauseConfirmationTask = null;

                    await _pauseRequestAsyncLock.WaitAsync(token);
                    try
                    {
                        _pauseRequested = true;
                        _resumeRequestTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
                        _pauseConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
                        pauseConfirmationTask = WaitForPauseConfirmationAsync(token);
                    }
                    finally
                    {
                        _pauseRequestAsyncLock.Release();
                    }

                    await pauseConfirmationTask;

                    _paused = true;
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }

            private async Task WaitForResumeRequestAsync(CancellationToken token)
            {
                using (token.Register(() => _resumeRequestTcs.TrySetCanceled(), useSynchronizationContext: false))
                {
                    await _resumeRequestTcs.Task;
                }
            }

            private async Task WaitForPauseConfirmationAsync(CancellationToken token)
            {
                using (token.Register(() => _pauseConfirmationTcs.TrySetCanceled(), useSynchronizationContext: false))
                {
                    await _pauseConfirmationTcs.Task;
                }
            }

            internal async Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
            {
                Task resumeRequestTask = null;

                await _pauseRequestAsyncLock.WaitAsync(token);
                try
                {
                    if (!_pauseRequested)
                    {
                        return;
                    }
                    resumeRequestTask = WaitForResumeRequestAsync(token);
                    _pauseConfirmationTcs.TrySetResult(true);
                }
                finally
                {
                    _pauseRequestAsyncLock.Release();
                }

                await resumeRequestTask;
            }
        }

        // PauseToken - consumer side
        public struct PauseToken
        {
            readonly PauseTokenSource _source;

            public PauseToken(PauseTokenSource source) { _source = source; }

            public Task<bool> IsPaused() { return _source.IsPaused(); }

            public Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
            {
                return _source.PauseIfRequestedAsync(token);
            }
        }

        // Basic usage

        public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
        {
            try
            {
                while (true)
                {
                    token.ThrowIfCancellationRequested();

                    Console.WriteLine("Before await pause.PauseIfRequestedAsync()");
                    await pause.PauseIfRequestedAsync();
                    Console.WriteLine("After await pause.PauseIfRequestedAsync()");

                    await Task.Delay(1000);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: {0}", e);
                throw;
            }
        }

        static async Task Test(CancellationToken token)
        {
            var pts = new PauseTokenSource();
            var task = DoWorkAsync(pts.Token, token);

            while (true)
            {
                token.ThrowIfCancellationRequested();

                Console.WriteLine("Press enter to pause...");
                Console.ReadLine();

                Console.WriteLine("Before pause requested");
                await pts.PauseAsync();
                Console.WriteLine("After pause requested, paused: " + await pts.IsPaused());

                Console.WriteLine("Press enter to resume...");
                Console.ReadLine();

                Console.WriteLine("Before resume");
                await pts.ResumeAsync();
                Console.WriteLine("After resume");
            }
        }

        static async Task Main()
        {
            await Test(CancellationToken.None);
        }
    }
}
noseratio
  • 59,932
  • 34
  • 208
  • 486
  • It seems a odd design decision that `PauseToken` has a with response and without response version. Why not always trigger a response and it is up to the user of the `PauseTokenSource` to decide if he wants to wait for a response or not. (also I think if `PauseTokenSource.Pause()` then `PauseToken.WaitWhilePausedWithResponseAsyc()` is called you will get a `NullRefrenceException` on the `response.TrySetResult(true);` in `PauseTokenSource.WaitWhilePausedWithResponseAsyc`) – Scott Chamberlain Feb 11 '14 at 21:01
  • @ScottChamberlain, I think he logically follows the design pattern of `CancellationTokenSource.Cancel`, which only requests the cancellation. Thanks for spotting a possible bug, I'll look at it. – noseratio Feb 11 '14 at 21:05
  • I get a `NullRefrenceExcption`, [try this version with better logging and a 3rd test case](http://pastebin.com/index/Qs1N7RB9). It should give you a `NullRefrenceExcption` about a second after you hit return the 2nd time. – Scott Chamberlain Feb 11 '14 at 22:09
  • @ScottChamberlain, good spot, thanks! I guess I was too concentrated on `PauseWithResponseAsync` :) – noseratio Feb 11 '14 at 22:13
  • That's why I think `WaitWhilePausedAsync` does not need to exist at all and `.Pause()` just needs to replace `m_pauseResponse = null;` with `m_pauseResponse = s_completedTask;` and it should be fixed. – Scott Chamberlain Feb 11 '14 at 22:15
8

All the other answers seem either complicated or missing the mark when it comes to async/await programming by holding the thread which is CPU expensive and can lead to deadlocks. After lots of trial, error and many deadlocks, this finally worked for my high usage test.

var isWaiting = true;
while (isWaiting)
{
    try
    {
        //A long delay is key here to prevent the task system from holding the thread.
        //The cancellation token allows the work to resume with a notification 
        //from the CancellationTokenSource.
        await Task.Delay(10000, cancellationToken);
    }
    catch (TaskCanceledException)
    {
        //Catch the cancellation and it turns into continuation
        isWaiting = false;
    }
}
Ben Gripka
  • 16,012
  • 6
  • 45
  • 41
4

it is works for me

        using System;

        using System.Threading;
        using System.Threading.Tasks;

        namespace TaskTest2
        {

            class Program
            {
                static ManualResetEvent mre = new ManualResetEvent(false);
                static void Main(string[] args)
                {

                   mre.Set();
                   Task.Factory.StartNew(() =>
                    {
                        while (true)
                        {
                            Console.WriteLine("________________");
                            mre.WaitOne();
                        }

                    } );

                    Thread.Sleep(10000);
                    mre.Reset();
                    Console.WriteLine("Task Paused");
                    Thread.Sleep(10000);
                    Console.WriteLine("Task Will Resume After 1 Second");
                    Thread.Sleep(1000);
                    mre.Set();

                    Thread.Sleep(10000);
                    mre.Reset();
                    Console.WriteLine("Task Paused");


                    Console.Read();
                }
            }
        }
Mehmet Topçu
  • 1
  • 1
  • 16
  • 31
  • 3
    Don't mix WaitOne and async/await. This solution is very similar to one I first implemented on my project and it ended up lasting for about 1 year in production but it has a flaw in a thread starved environment. WaitOne occupies the thread, rather than releasing it while waiting and can lead to high CPU usage and a deadlock. – Ben Gripka Feb 14 '20 at 06:29
2

Ok, maybe this deserves an answer, but I'm not so familiar with C# and I don't have MonoDevelop here, and it's 3 o' clock AM, so please have pity.

I'm suggesting something like this

class Spellchecker
{
  private CancellationTokenSource mustStop = null;
  private volatile Task currentTask = null;

  //TODO add other state variables as needed

  public void StartSpellchecker()
  {
    if (currentTask != null)
    {
      /*
      * A task is already running,
      * you can either throw an exception
      * or silently return
      */
    }

    mustStop = new CancellationTokenSource();
    currentTask = SpellcheckAsync(mustStop.Token);
    currentTask.Start();
  }

  private async Task SpellcheckAsync(CancellationToken ct)
  {
    while (!ct.IsCancellationRequested))
    {
      /*
      * TODO perform spell check
      * This method must be the only one accessing
      * the spellcheck-related state variables
      */
    }
    currentTask = null;
  }

  public async Task StopSpellchecker()
  {
    if (currentTask == null)
    {
      /*
      * There is no task running
      * you can either throw an exception
      * or silently return
      */
    }
    else
    {
      /*
      * A CancelAfter(TimeSpan) method
      * is also available, which might interest you
      */
      mustStop.Cancel();

      //Remove the following lines if you don't want to wait for the task to actually stop
      var task = currentTask;
      if (task != null)
      {
        await task;
      }
    }
  }
}
Noctis
  • 11,507
  • 3
  • 43
  • 82
Giulio Franco
  • 3,170
  • 15
  • 18
  • Thanks, but as far as I understand this, it does `Start`/`Stop` logic, which destroys the state of the process. The API for the client app is `Start`/`Stop`/`Pause`/`Resume`. I've tried to explain how `Start`/`Stop` is different from `Pause`/`Resume` in my comments to the question. – noseratio Oct 27 '13 at 02:27
  • You're wrong. This is not a Start/Stop. This is a Pause/Resume. If the `SpellcheckAsync` method does not use local variables (it can only use instance fields), then the only thing which is lost upon pause is the program counter. But the cancellation is done via a CancellationToken, so the task can decide when to stop, which basically allows to recover the program counter, too. – Giulio Franco Oct 27 '13 at 02:44
  • I guess it depends on how to look at it. If by `SpellcheckAsync` you meant a replacement for my `DoWorkAsync`, then cancellation is a `Stop` here and and not `Pause`, IMO. Yes, I'm after the *natural liner flow for my high-level process logic* (let it be spellcheck or anything else). – noseratio Oct 27 '13 at 02:56