40

I have such code (simplified here) which awaits finishing task:

var task_completion_source = new TaskCompletionSource<bool>();
observable.Subscribe(b => 
   { 
      if (b) 
          task_completion_source.SetResult(true); 
   });
await task_completion_source.Task;    

The idea is to subscribe and wait for the true in the stream of booleans. This finishes the "task" and I can move on beyond the await.

However I would like to cancel -- but not subscription, but awaiting. I would like to pass cancel token (somehow) to task_completion_source so when I cancel the token source, the await will move on.

How to do it?

Update: CancellationTokenSource is external to this code, all I have here is the token from it.

astrowalker
  • 3,123
  • 3
  • 21
  • 40
  • What's wrong with `task_completion_source.SetCanceled`? Mind you, that assumes you're handling task cancellation correctly :) – Luaan Oct 06 '16 at 13:22
  • @Luaan, it is nothing wrong, but there is no code **running** which could execute it. Everyone is waiting for something -- subscription waits for data (could be none), `await` waits for the task to complete. – astrowalker Oct 06 '16 at 13:27
  • If you cancel the async process via the cancellationtoken it will trigger a TaskCanceledException which in turn will end the await (you'll need to handle the exception). – Steve Bird Oct 06 '16 at 13:27
  • @SteveBird, what do you mean by "cancel the async process" -- setting cancel at the CancellationTokenSource? If yes, it is external party of the function. If you mean handling cancelled token -- it is exactly my question. I don't see a point how I can use the token in the first place. – astrowalker Oct 06 '16 at 13:29

3 Answers3

58

If I understand you correctly, you can do it like this:

using (cancellationToken.Register(() => {
    // this callback will be executed when token is cancelled
    task_comletion_source.TrySetCanceled();
})) {
    // ...
    await task_comletion_source.Task;
}

Note that it will throw an exception on your await, which you have to handle.

Jeff Fischer
  • 2,063
  • 1
  • 17
  • 12
Evk
  • 98,527
  • 8
  • 141
  • 191
  • 3
    The op clarified, you may want to to `CancellationToken token; token.Register(...` to match his code more. Also, you may want to do `TrySetCanceled` (and use `TrySetResult`) so you don't throw a exception if the task is complete already. – Scott Chamberlain Oct 06 '16 at 13:32
  • Thank you very much, the token source is not the problem here, because it was not involved (in original edit :-) ). – astrowalker Oct 06 '16 at 13:40
  • @ScottChamberlain, ah, thanks for the reminder about `Try...`, I didn't fall into that trap by accident, but now I won't fall thanks to you. – astrowalker Oct 06 '16 at 13:41
  • 2
    From the comments on the other answer, it might be a good idea to put the result of the `Register` call into a `using` block to have it properly disposed of. I believe this should avoid the mentioned resource leak. – ygoe May 01 '18 at 17:01
  • 2
    @ygoe thanks, done. Though I consider this a very minor leak, but of course if one can avoid it - one should avoid it. – Evk May 01 '18 at 17:21
18

I recommend that you do not build this yourself. There are a number of edge cases around cancellation tokens that are tedious to get right. For example, if the registration returned from Register is never disposed, you can end up with a resource leak.

Instead, you can use the Task.WaitAsync extension method from my AsyncEx.Tasks library:

var task_completion_source = new TaskCompletionSource<bool>();
observable.Subscribe(b => 
{ 
  if (b) 
    task_completion_source.SetResult(true); 
});
await task_completion_source.Task.WaitAsync(cancellationToken);

On a side note, I'd strongly encourage you to use ToTask rather than an explicit TaskCompletionSource. Again, ToTask handles edge cases nicely for you.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • Thank you, but of course I am curious -- can you mention edge cases of using `TaskCompletionSource` (or point to them)? So far I didn't have problems with using it. – astrowalker Oct 07 '16 at 08:15
  • 1
    `Set*` will throw if the task is already completed. `Set*` and `TrySet*` by default allow synchronous continuations (your callstack after the `await` may be *inside* `Subscribe`. The Task by default allows child tasks. – Stephen Cleary Oct 07 '16 at 10:37
  • @StephenCleary, under what circumstances would such an edge case (Registration not disposed) occur? – Justin Skiles Jun 01 '17 at 17:54
  • 2
    @JustinSkiles: If the registration is not disposed (e.g., using the code in the accepted answer), *and* if you have a real (cancelable) `CancellationToken` that is long-lived (i.e., representing process shutdown), then the registration is kept alive, and so is the TCS. If this code is executed multiple times during a process run, then each time it's run another registration and TCS are leaked. – Stephen Cleary Jun 01 '17 at 19:17
  • @StephenCleary Your library is really essential for working with Tasks. Have you made any proposals in the CoreFx repo for any of the many things you've added in AsyncEx? – MgSam Jan 26 '18 at 20:52
  • @MgSam: No. Members of the team are aware of my library. I think they view it as a nice add-on, so NuGet is a good fit. – Stephen Cleary Jan 26 '18 at 22:20
  • 1
    Somehow it's a contradiction: don't write your own I wrote one myself :). – Wouter Sep 30 '20 at 12:26
2

Here was my stab at writing this myself. I almost made the mistake for not disposing the Register (thanks to Stephen Cleary)

    /// <summary>
    /// This allows a TaskCompletionSource to be await with a cancellation token and timeout.
    /// 
    /// Example usable:
    /// 
    ///     var tcs = new TaskCompletionSource<bool>();
    ///           ...
    ///     var result = await tcs.WaitAsync(timeoutTokenSource.Token);
    /// 
    /// A TaskCanceledException will be thrown if the given cancelToken is canceled before the tcs completes or errors. 
    /// </summary>
    /// <typeparam name="TResult">Result type of the TaskCompletionSource</typeparam>
    /// <param name="tcs">The task completion source to be used  </param>
    /// <param name="cancelToken">This method will throw an OperationCanceledException if the cancelToken is canceled</param>
    /// <param name="timeoutMs">This method will throw a TimeoutException if it doesn't complete within the given timeout, unless the timeout is less then or equal to 0 or Timeout.Infinite</param>
    /// <param name="updateTcs">If this is true and the given cancelToken is canceled then the underlying tcs will also be canceled.  If this is true a timeout occurs the underlying tcs will be faulted with a TimeoutException.</param>
    /// <returns>The tcs.Task</returns>
    public static async Task<TResult> WaitAsync<TResult>(this TaskCompletionSource<TResult> tcs, CancellationToken cancelToken, int timeoutMs = Timeout.Infinite, bool updateTcs = false)
    {
        // The overrideTcs is used so we can wait for either the give tcs to complete or the overrideTcs.  We do this using the Task.WhenAny method.
        // one issue with WhenAny is that it won't return when a task is canceled, it only returns when a task completes so we complete the
        // overrideTcs when either the cancelToken is canceled or the timeoutMs is reached.
        //
        var overrideTcs = new TaskCompletionSource<TResult>();
        using( var timeoutCancelTokenSource = (timeoutMs <= 0 || timeoutMs == Timeout.Infinite) ? null : new CancellationTokenSource(timeoutMs) )
        {
            var timeoutToken = timeoutCancelTokenSource?.Token ?? CancellationToken.None;
            using( var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancelToken, timeoutToken) )
            {
                // This method is called when either the linkedTokenSource is canceled.  This lets us assign a value to the overrideTcs so that
                // We can break out of the await WhenAny below.
                //
                void CancelTcs()
                {
                    if( updateTcs && !tcs.Task.IsCompleted )
                    {
                        // ReSharper disable once AccessToDisposedClosure (in this case, CancelTcs will never be called outside the using)
                        if( timeoutCancelTokenSource?.IsCancellationRequested ?? false )
                            tcs.TrySetException(new TimeoutException($"WaitAsync timed out after {timeoutMs}ms"));
                        else
                            tcs.TrySetCanceled();
                    }

                    overrideTcs.TrySetResult(default(TResult));
                }

                using( linkedTokenSource.Token.Register(CancelTcs) )
                {
                    try
                    {
                        await Task.WhenAny(tcs.Task, overrideTcs.Task);
                    }
                    catch { /* ignore */ }

                    // We always favor the result from the given tcs task if it has completed.
                    //
                    if( tcs.Task.IsCompleted )
                    {
                        // We do another await here so that if the tcs.Task has faulted or has been canceled we won't wrap those exceptions
                        // in a nested exception.  While technically accessing the tcs.Task.Result will generate the same exception the
                        // exception will be wrapped in a nested exception.  We don't want that nesting so we just await.
                        await tcs.Task;
                        return tcs.Task.Result;
                    }

                    // It wasn't the tcs.Task that got us our of the above WhenAny so go ahead and timeout or cancel the operation.
                    //
                    if( timeoutCancelTokenSource?.IsCancellationRequested ?? false )
                        throw new TimeoutException($"WaitAsync timed out after {timeoutMs}ms");

                    throw new OperationCanceledException();
                }
            }
        }
    }

This with throw a TaskCanceledException if the cancelToken is canceled before the tcs gets a result or errors.

Tod Cunningham
  • 3,691
  • 4
  • 30
  • 32