17

I have the following line of code used to read asynchronously from a NetworkStream:

int bytesRead = await Task<int>.Factory.FromAsync(this.stream.BeginRead, this.stream.EndRead, buffer, 0, buffer.Length, null);

I'd like to make it support cancellation. I see that I can cancel tasks using a CancellationTokenSource, however I don't see any way I can pass it to TaskFactory.FromAsync().

Is it possible to make a FromAsync()-constructed task support cancellation?

Edit: I want to cancel a task that is already running.

Gigi
  • 28,163
  • 29
  • 106
  • 188
  • Just to clarify: I want to cancel a task that may already be running. – Gigi Jul 27 '14 at 11:30
  • It seems, that there is no overload of `FromAsync` that takes a cancellation token. One possible solution would be to add one more layer - start your own action with `FromAsync` and then use another `Task` that supports cancellation from outside to read the stream, within the custom action. – keenthinker Jul 27 '14 at 11:36
  • Do you have a good reason to not use `NetworkStream.ReadAsync`, which support `CancellationToken`? – avo Jul 27 '14 at 11:37
  • @avo NetworkStream.ReadAsync is Stream.ReadAsync. An the latter just throws away the token. Basically not supported. – usr Jul 27 '14 at 11:41
  • @usr, Are you sure it's impossible to cancel this? I suspect the OP could register a callback with `CancellationToken.Register` and call `NetworkStream.Dispose` from there. That should cancel the pending read. – avo Jul 27 '14 at 11:44

3 Answers3

12

Gigi, unfortunately the semantic nature of FromAsync indicates that you are only adapting an asynchronous process to TPL's API (TPL = Microsoft's Task Parallel Library)

In essence, TPL's ReadAsync controls the async behaviour itself, whilst FromAsync only wraps the behaviour (but doesn't control it).

Now since Cancellation is a TPL specific construct, and since FromAsync has no control on the inner workings of the async method being called, then there is no guaranteed way to cleanly cancel the task and ensure that all resources are closed correctly (which is why it was omitted. If you're curious, just decompile the method ;))

In these situations, it makes more sense to wrap the actual async call yourself in a normal task and detect the OperationCancelled exception, which will give you the opportunity to close your stream by making the appropriate calls.

In short, the answer is no, but there is nothing stopping you from creating a generic overloaded method that will pick the correct strategy to cleanly close a stream depending on its type.

Stefan Z Camilleri
  • 4,016
  • 1
  • 32
  • 42
  • 1
    The question is about cancelation, and this text says much about resources cleanup, which isn't the async operation's concern on itself, you have to do it whether it competes or cancels. The other reply is more to the point: there isn't a solid concept of cancelation in the older API, only case-specific recipes. – hypersw Sep 25 '16 at 01:12
9

As others have already mentioned, there is no clean way of achieving what you're asking for. The notion of cancellation was absent from the Asynchronous Programming Model; thus, it couldn't be retrofitted through the FromAsync converters.

However, you can introduce cancellation for the Task that wraps the asynchronous operation. This will not cancel the underlying operation itself – your NetworkStream would still continue reading all the requested bytes from the socket – but it will permit your application to react as if the operation was cancelled, immediately throwing an OperationCanceledException from your await (and executing any registered task continuations). The result of the underlying operation, once completed, will be ignored.

This is a helper extension method:

public static class TaskExtensions
{
    public async static Task<TResult> HandleCancellation<TResult>(
        this Task<TResult> asyncTask,
        CancellationToken cancellationToken)
    {     
        // Create another task that completes as soon as cancellation is requested.
        // http://stackoverflow.com/a/18672893/1149773
        var tcs = new TaskCompletionSource<TResult>();
        cancellationToken.Register(() =>
            tcs.TrySetCanceled(), useSynchronizationContext: false);
        var cancellationTask = tcs.Task;

        // Create a task that completes when either the async operation completes,
        // or cancellation is requested.
        var readyTask = await Task.WhenAny(asyncTask, cancellationTask);

        // In case of cancellation, register a continuation to observe any unhandled 
        // exceptions from the asynchronous operation (once it completes).
        // In .NET 4.0, unobserved task exceptions would terminate the process.
        if (readyTask == cancellationTask)
            asyncTask.ContinueWith(_ => asyncTask.Exception, 
                TaskContinuationOptions.OnlyOnFaulted | 
                TaskContinuationOptions.ExecuteSynchronously);

        return await readyTask;
    }
}

And this is an example that uses the extension method to treat an operation as cancelled after 300ms:

CancellationTokenSource cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromMilliseconds(300));

try
{
    int bytesRead = 
        await Task<int>.Factory.FromAsync(this.stream.BeginRead, this.stream.EndRead, buffer, 0, buffer.Length, null)
                               .HandleCancellation(cts.Token);
}
catch (OperationCanceledException)
{
    // Operation took longer than 300ms, and was treated as cancelled.
}
Douglas
  • 53,759
  • 13
  • 140
  • 188
  • This works but be aware that the IO will still cause side effects and effectively make the stream position undefined. The stream becomes unusable. This doesn't really avoid the need to open a new stream. – usr Jul 27 '14 at 14:41
  • Whilst this is a very nice method, I am not too sure this answer's the user's question. First of all, binding a hardcoded timeout to an IO operation is not the correct approach, since IO performance is hardware dependant. The extension method is also good, yet it does not handle the IO resources cleanly, as @usr correctly points out. – Stefan Z Camilleri Jul 27 '14 at 14:43
  • The timeout was just an example to demonstrate the use of the extension method. @usr is correct, although the limitation arises from the `Stream` class's inability to handle concurrent reads. The extension method would work without issue on classes that can run asynchronous operations concurrently. – Douglas Jul 27 '14 at 14:48
6

No, there is no generic way to cancel such a task. Cancellation is API specific.

  • For example, WebClient has a Cancel method.
  • A Socket or a FileStream needs to be Close'd to cancel an outstanding call.
  • Web-service clients have even different ways of aborting calls.
  • ...

This is because the implementer of the IO operation must support cancellation.

It might seem tempting to use NetworkStream.ReadAsync and pass a cancellation token but is Stream.ReadAsync. An the latter just throws away the token. Basically not supported.

Stream.ReadAsync is just the base class method. It does not do anything by itself. Concrete IO operations are issued only by derived classed. Those must support cancellation natively. Stream can't do anything to force them. It happens that NetworkStream doesn't support cancellation.

I understand that you want to cancel the operation and leave the socket open. But it is not possible. (Subjective note: This is really a sad state of affairs. Especially considering that Windows supports cancellable IO at the Win32 level.)

If you still want your app to quickly continue, although the IO operation is not cancellable, just ignore the result of that task and continue. Be aware that eventually the IO might complete and for example drain data from the socket buffers or cause other side-effects.

"Cancelling by ignoring" effectively make the stream position undefined. The stream becomes unusable. This doesn't really avoid the need to open a new stream. You still have to get rid of the old stream (in most cases) and reopen. Also, you are introducing concurrency.

usr
  • 168,620
  • 35
  • 240
  • 369