2

I have a blocking operation that reads from a queue, but it can take a timeout. I can easily convert this to an "async" operation:

    public async Task<IMessage> ReceiveAsync(CancellationToken cancellationToken)
    {
        return await Task.Run(() =>
        {
            while (true)
            {
                cancellationToken.ThrowIfCancellationRequested();

                // Try receiving for one second
                IMessage message = consumer.Receive(TimeSpan.FromSeconds(1));

                if (message != null)
                {
                    return message;
                }
            }
        }, cancellationToken).ConfigureAwait(false);
    }

Aborting a thread is generally considered bad practice since you can leak resources, so the timeout seems like the only way to cleanly stop a thread. So I have three questions:

  1. What is a generally accepted timeout value for "immediate" cancellation?
  2. For libraries that provide built-in async methods, does immediate cancellation truly exist or do they also use timeouts and loops to simulate it? Maybe the question here is how would you make use of software interrupts and if these also have to do some sort of polling to check if there are interrupts, even if it's at the kernel/CPU level.
  3. Is there some alternate way I should be approaching this?

Edit: So I may have found part of my answer with Thread.Interrupt() and then handling ThreadInterruptedException. Is this basically a kernel-level software interrupt and as close to "immediate" as we can get? Would the following be a better way of handling this?

    public async Task<IMessage> ReceiveAsync(CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();

        var completionSource = new TaskCompletionSource<IMessage>();

        var receiverThread = new Thread(() =>
        {
            try
            {
                completionSource.SetResult(consumer.Receive());
            }
            catch (ThreadInterruptedException)
            {
                completionSource.SetCanceled();
            }
            catch (Exception ex)
            {
                completionSource.SetException(ex);
            }
        });

        cancellationToken.Register(receiverThread.Interrupt);

        receiverThread.Name = "Queue Receive";
        receiverThread.Start();

        return await completionSource.Task.ConfigureAwait(false);
    }
i3arnon
  • 113,022
  • 33
  • 324
  • 344
Nelson Rothermel
  • 9,436
  • 8
  • 62
  • 81
  • 2
    Some APIs *do* support immediate cancellation because they can put the thread into a wait state rather than polling. But, if the API you're using doesn't support a CancellationToken, you're kinda stuck with polling. There's no "generally accepted" value for the timeout in this scenario because that is application specific... Interrupt is not kernel-level, there's no such thing on the native side. The reason to throw the exception is to inform the caller of cancellation, if a null message is good enough, you may not need to throw. – Peter Ritchie Sep 14 '14 at 23:16
  • @PeterRitchie: From further testing, in my case the API does put the thread into WaitSleepJoin and `Thread.Interrupt` will only interrupt a thread in that state. In fact, if I interrupt before starting the thread, the thread runs until the `Receive()` then throws `ThreadInterruptedException`. It seems to me the order of preference in my scenario is `CancellationToken`, `Thread.Interrupt`, block with timeout (most of the time you're listening for receive), poll (not blocking receive, most of the time you're sleeping). `Thread.Interrupt` appears to be the first feasible case for me. – Nelson Rothermel Sep 14 '14 at 23:59
  • 1
    Possibly, it's hard to tell what damage calling Interrupt has on the consumer instance though. – Peter Ritchie Sep 15 '14 at 00:30
  • @PeterRitchie: Good point... `Thread.Interrupt` seems to cleanly close only when all of my worker threads are in WaitSleepJoin. If work is actually being done my connection object becomes invalidated across all threads so I get `ThreadInterruptedException` in other unexpected areas. :( Looks like I'm back to the blocking receive with timeout. I guess I can only reliably use `Thread.Interrupt` when I control all the code. – Nelson Rothermel Sep 15 '14 at 01:57
  • Yes,I think the timeout is the safest bet – Peter Ritchie Sep 15 '14 at 02:11

1 Answers1

1
  1. It depends on your specific needs. A second could be immediate for some and slow for others.
  2. Libraries (good ones) which provide async API do so from the bottom up. They usually don't wrap blocking (synchronous) operations with a thread to make them seem asynchronous. They use TaskCompletionSource to create truly async methods.
  3. I'm not sure what you mean by queue (the built-in Queue in .Net doesn't have a Receive method) but you should probably be using a truly async data structure like TPL Dataflow's BufferBlock.

About your specific code sample. You are holding up a thread throughout the entire operation (that's async over sync) which is costly. You could instead try to consume quickly and then wait asynchronously for the timeout to end, or for the CancellationToken to be cancelled. There's also no point in using another thread with Task.Run. You can simply have the async lambda be the content of ReceiveAsync:

public async Task<IMessage> ReceiveAsync(CancellationToken cancellationToken)
{
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();
        // Try receiving for one second
        IMessage message;
        if (!consumer.TryReceive(out message))
        {
             await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        }
        if (message != null)
        {
            return message;
        }
    }
}

If your queue implements IDisposable a different (harsher) option would be to call Dispose on it when the CancellationToken is cancelled. Here's how.

Community
  • 1
  • 1
i3arnon
  • 113,022
  • 33
  • 324
  • 344