-1

I've got a class with a static ConcurrentQueue. One class receives messages and puts them on the queue, whilst a different thread on this class reads them from that queue and processes them one at a time. That method is aborted with a cancellationtoken.

The method that empties the queue looks like this:

public async Task HandleEventsFromQueueAsync(CancellationToken ct, int pollDelay = 25)
{
    while (true)
    {
        if (ct.IsCancellationRequested)
        {
            return;
        }

        if(messageQueue.TryDequeue(out ConsumeContext newMessage))
        {
            handler.Handle(newMessage);
        }

        try
        {
            await Task.Delay(pollDelay, ct).ConfigureAwait(true);
        } 
        catch (TaskCanceledException)
        {
            return;
        }
    }
}

My testing methods look like this:

CancellationToken ct = source.Token;
Thread thread = new Thread(async () => await sut.HandleEventsFromQueueAsync(ct));
thread.Start();

EventListener.messageQueue.Enqueue(message1);
EventListener.messageQueue.Enqueue(message2);
await Task.Delay(1000);
source.Cancel(false);

mockedHandler.Verify(x => x.Handle(It.IsAny<ConsumeContext>()), Times.Exactly(2));

So I start my dequeueing method in its own thread, with a fresh cancellation token. Then I enqueue a couple of messages, give the process a second to handle them, and then use source.Cancel(false) to put an end to the thread and make the method return. Then I check that the handler was called the right number of times. Of course I'm testing this in a couple variations, with different message types and different times when I abort the dequeueing method.

The issue is that when I run any of my tests individually, they all succeed. But when I try to run them as a group, Visual Studio does not run every test. There's no error message, and the tests it does run succeed fine, but the run just stops after the second test.

visual studio test explorer

I do not have an idea why this happens. My tests are all identical in structure. I'm aborting the dequeueing thread properly every time.

What could compel Visual Studio to stop a test run, without throwing any kind of error?

KeizerHarm
  • 330
  • 1
  • 4
  • 22
  • Are you using the same `CancellationTokenSource` for all tests? The declaration of your `CancellationTokenSource` `source` variable is not included. – mortb Jul 06 '20 at 11:45
  • The source is initialised in the TestInitialize method. `source = new CancellationTokenSource();` It is just declared in the class scope. – KeizerHarm Jul 06 '20 at 11:47
  • 1
    Well then, if you cancel the `source.Token` in one test you will cancel all tests since it is the same token. You will need a different token for every test – mortb Jul 06 '20 at 11:49
  • @mortb I am making a new source and a new token for every test. It's just that the source is declared in TestInitialize (runs before every test), and the token is created every time in the test itself, but both should be fresh for each test.. – KeizerHarm Jul 06 '20 at 11:51
  • Maybe this is your problem? https://stackoverflow.com/a/51189555/1257728 As it comes to my mind, I have had that problem – mortb Jul 06 '20 at 12:08
  • @mjwills OK, I got my [minimal reproducible example](https://stackoverflow.com/help/minimal-reproducible-example). Turns out that the cause lies with my message deserialising protocol. I narrowed it down to a single call to GetProperties that makes the test run quit - but why would it do that? Am I being punished for combining the two devils of unittests, multithreading and reflection? – KeizerHarm Jul 06 '20 at 13:07
  • 1
    Newer code tend to not use `Thread` but `Task.Run(...)` https://stackoverflow.com/a/13429164/1257728 When you write `Thread(async () => await sut.HandleEventsFromQueueAsync(ct));` making the call to `HandleEventsFromQueueAsync` async void which will make your process crash, but since the code was called "async void" the exception is not caught by the calling code https://stackoverflow.com/a/45448104/1257728 – mortb Jul 06 '20 at 13:30
  • @mortb Interesting... I figured that Task was conceptually the wrong idea here; the dequeueing method runs continuously but does not ever yield a result; it is the driver for the rest of the application. – KeizerHarm Jul 06 '20 at 13:45
  • @mortb Aaaaaand yep. Ok, that was it. Bloody null pointer exception in the thread, that saw fit to abort my test run without ever telling me. Now question is how to fix it >:( – KeizerHarm Jul 06 '20 at 13:52

2 Answers2

1

I have solved my own issue. Turns out that the newly created thread threw an exception, and when threads throw exceptions those are ignored, but they still stop the unit test from happening. After fixing the issue causing the exception, the tests work fine.

KeizerHarm
  • 330
  • 1
  • 4
  • 22
1

You are passing an async lambda to the Thread constructor. The Thread constructor doesn't understand async delegates (does not accept a Func<Task> argument), so you end up with an async void lambda. Async void methods should be avoided for anything that it's not an event handler. What happens in your case is that the explicitly created thread is terminated when the code hits the first await, and the rest of the body runs in ThreadPool threads. It seems that the code never fails with an exception, otherwise the process would crash (this is the default behavior of async void methods).

Suggestions:

  1. Use a Task instead of a Thread. This way you'll have something to await before exiting the test.
CancellationToken ct = source.Token;
Task consumerTask = Task.Run(() => sut.HandleEventsFromQueueAsync(ct));

EventListener.messageQueue.Enqueue(message1);
EventListener.messageQueue.Enqueue(message2);
await Task.Delay(1000);
source.Cancel(false);
await consumerTask; // Wait the task to complete

mockedHandler.Verify(x => x.Handle(It.IsAny<ConsumeContext>()), Times.Exactly(2));
  1. Consider using a BlockingCollection or an asynchronous queue like a Channel instead of a ConcurrentQueue. Polling is an awkward and inefficient technique. With a blocking or async queue you'll not be obliged to do loops waiting for new messages to arrive. You'll be able to enter a waiting state, and notified instantly when a new message arrives.

  2. Configure the awaiting with ConfigureAwait(false). ConfigureAwait(true) is the default and does nothing.

  3. Consider propagating cancellation by throwing an OperationCanceledException. This is the standard way of propagating cancellation in .NET. So instead of:

if (ct.IsCancellationRequested) return;

...it is preferable to do this:

ct.ThrowIfCancellationRequested();
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thank you for your help, and also all the suggestions. The reason I was using a ConcurrentQueue is because I need my producers to know when their message has been consumed. I have not yet looked into BlockingCollection, but from what I see Channels do not allow that behaviour. – KeizerHarm Jul 07 '20 at 09:47
  • 1
    @KeizerHarm how do the producers are notified when their message has been consumed? By enumerating the queue periodically? This would be also quite inefficient. A probably better way to establish a communication between the producers and the consumers could be to pass a `TaskCompletionSource` along with each message. The producer would `await` the completion of the `TaskCompletionSource.Task` property after sending the message, and the consumer would invoke the `TaskCompletionSource.SetResult` method to send the signal that the message has been consumed. – Theodor Zoulias Jul 07 '20 at 12:17
  • 1
    Thank you very much; that sounds like it could be what I need! I was indeed polling the queue from the other side as well. Performance is not terribly important; I don't expect to receive more than one message a minute anyway, but conceptually such an entity sounds much more appropriate so I will look into using that. – KeizerHarm Jul 07 '20 at 12:20
  • 1
    @KeizerHarm my pleasure! Small tip: when constructing a `TaskCompletionSource` you could consider passing the `RunContinuationsAsynchronously` option to the constructor. This generally makes things simpler because it dissociates the sending and the receiving of the signal. Without it, the same thread that invokes the `SetResult` at the consumer's site will immediately continue with running the code that follows the `await` of the `Task` at the producer's site, before returning to run the code after the `SetResult` at the consumer's site. – Theodor Zoulias Jul 07 '20 at 12:33