0

I have a C# console application that uses Nancy and Nancy.Hosting.Self.

The idea is that it will serve an API via Nancy, and the main application will routinely poll a number of connections to various application + obtain data from those connections when request via the API (via Nancy).

So I will have 2 running processes, the constant polling and the HTTP server.

My Program.cs contains the following snippets.

Task pollTask = null;
try {
  pollTask = Task.Run(async () => {
    while (processTask) {
      connectionPool.PollEvents();

      await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
    }
    keepRunning = false;
  }, wtoken.Token);
}
catch (AggregateException ex) {
  Console.WriteLine(ex);
}
catch (System.Threading.Tasks.TaskCanceledException ex) {
  Console.WriteLine("Task Cancelled");
  Console.WriteLine(ex);
}

And later...

using (var host = new Nancy.Hosting.Self.NancyHost(hostConfigs, new Uri(serveUrl))) {
  host.Start();
  // ...
  // routinely checking console for a keypress to quit which then sets
  // processTask to false, which would stop the polling task, which
  // in turn sets keepRunning to false which stops the application entirely.
}

The polling task seems to just die/stop without any output to the console to indicate why it stopped. In the check for the console input keypress, I also query the pollTask.Status, and it eventually details "Faulted". But I've no idea why. I'm also questioning the reliability of a long/forever running Task.

To prevent this being vague, I have one primary question. Is Task suitable for a forever running task in the manner presented above. If it is not, what should I be using instead to achieve 2 parallel proceses one of which being Nancy.

UPDATE (17/07/2018):
After taking on board the suggestions and answers thus far, I have been able to ascertain the exception that is eventually occuring and killing the process:

PollEvents process appears to be throwing an exception...
System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
   at System.Net.HttpWebRequest.BeginGetRequestStream(AsyncCallback callback, Object state)
   at System.Net.Http.HttpClientHandler.StartGettingRequestStream(RequestState state)
   at System.Net.Http.HttpClientHandler.PrepareAndStartContentUpload(RequestState state)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at ApiProxy.ServiceA.Connection.<>c__DisplayClass22_0.<<Heartbeat>b__0>d.MoveNext() in \api-proxy\src\ServiceA\Connection.cs:line 281
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at ApiProxy.ServiceA.Connection.Heartbeat() in \api-proxy\src\ServiceA\Connection.cs:line 274
   at ApiProxy.ServiceA.Connection.PollEvents(Nullable`1 sinceEventId) in \api-proxy\src\ServiceA\Connection.cs:line 313
   at ApiProxy.ConnectionPool.PollEvents() in \api-proxy\src\ConnectionPool.cs:line 50
   at ApiProxy.Program.<>c.<<Main>b__5_0>d.MoveNext() in \api-proxy\Program.cs:line 172
---> (Inner Exception #0) System.InvalidOperationException: There were not enough free threads in the ThreadPool to complete the operation.
   at System.Net.HttpWebRequest.BeginGetRequestStream(AsyncCallback callback, Object state)
   at System.Net.Http.HttpClientHandler.StartGettingRequestStream(RequestState state)
   at System.Net.Http.HttpClientHandler.PrepareAndStartContentUpload(RequestState state)
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at ApiProxy.ServiceA.Connection.<>c__DisplayClass22_0.<<Heartbeat>b__0>d.MoveNext() in \api-proxy\src\ServiceA\Connection.cs:line 281<---

Because the try...catch is now inside the Task, it means while this error happens repeatedly and at speed, eventually it appears to rectify itself. However it would be ideal to be able to query the availability of the ThreadPool before demanding more of it. It also looks like the ThreadPool issue isn't related to my code, but instead of Nancy.

After looking up the source of the error, I've identified it happening in the following:

public bool Heartbeat() {
  if (connectionConfig.events.heartbeatUrl == "") {
    return false;
  }

  var val = false;
  var task = Task.Run(async () => {
    var heartbeatRequest = new HeartbeatRequest();
    heartbeatRequest.host = connectionConfig.host;
    heartbeatRequest.name = connectionConfig.name;
    heartbeatRequest.eventId = lastEventId;

    var prettyJson = JToken.Parse(JsonConvert.SerializeObject(heartbeatRequest)).ToString(Formatting.Indented);
    var response = await client.PostAsync(connectionConfig.events.heartbeatUrl, new StringContent(prettyJson, Encoding.UTF8, "application/json"));

    // todo: create a heartbeatResponse extending a base response type
    PingResponse heartbeatResponse = JsonConvert.DeserializeObject<PingResponse>(await response.Content.ReadAsStringAsync());
    if (heartbeatResponse != null) {
      Console.WriteLine("Heartbeat: " + heartbeatResponse.message);
      val = heartbeatResponse.success;
    }
    else {
      // todo: sentry?
    }
  });
  task.Wait();

  return val;
}

I'm wrapping the calls in a Task because otherwise I end up with a sea of async definitions everywhere. Is this the possible source of the ThreadPool starvation?

UPDATE 2
Corrected the above code by removing the Task.Run that wraps the PostAsync. The calling code would then call Heartbeat().Wait(), and so the method would now look like:

public async Task<bool> Heartbeat() {
  if (connectionConfig.events.heartbeatUrl == "") {
    return false;
  }

  var val = false;
  var heartbeatRequest = new HeartbeatRequest();
  heartbeatRequest.host = connectionConfig.host;
  heartbeatRequest.name = connectionConfig.name;
  heartbeatRequest.eventId = lastEventId;

  var prettyJson = JToken.Parse(JsonConvert.SerializeObject(heartbeatRequest)).ToString(Formatting.Indented);
  var response = await client.PostAsync(connectionConfig.events.heartbeatUrl, new StringContent(prettyJson, Encoding.UTF8, "application/json"));

  PingResponse heartbeatResponse = JsonConvert.DeserializeObject<PingResponse>(await response.Content.ReadAsStringAsync());
  if (heartbeatResponse != null) {
    Console.WriteLine("Heartbeat: " + heartbeatResponse.message);
    val = heartbeatResponse.success;
  }
  else {
    // todo: sentry?
  }

  return val;
}

Hope some of this experience of mine helps others. I'm not yet sure if the above changes (there were many like this) will prevent the Thread starvation.

Dave Goodchild
  • 322
  • 4
  • 9
  • 4
    Your `try/catch` is useless in this context: you are spawning a new `Task`, but not `await`ing for it (neither in a sychronous nor in an asynchronous manner), so there is nothing to catch there. You should move your `try/catch` logic inside the task or in a place where you `Wait()/await` it. – Federico Dipuma Jul 16 '18 at 09:04

2 Answers2

1

When the task fails... that is, when an exception happens inside of

pollTask = Task.Run(async () => {
        while (processTask) {
            connectionPool.PollEvents();

            await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
        }
        keepRunning = false;
    }, wtoken.Token);

The exception will be stored in the returned task. That is, Task.Exception. And the task will be marked faulted.

This exception won't be thrown in your context unless you call Task.Wait, await the task or similar. See below for what I suggest you to use.


It is likely that connectionPool.PollEvents(); eventually throws some exception, that because of what I mention above, you won't catch.

If you cannot prevent the exception, you probably want to handle it inside the task... unless we are talking about a condition that would mean to tear down connectionPool or something more drastic.

I do not know what could be making PollEvents trip off. One possibility is that you use the same object elsewhere and it is not thread safe.

Speaking of thread safety. I hope processTask and keepRunning are volatile. Although, as you will see below, you do not need them.


About a long running task, please use this approach:

Task.Factory.StartNew(mehtod, TaskCreationOptions.LongRunning);

Or any overload of Factory.StartNew that takes TaskCreationOptions.

Internally when you create a task with TaskCreationOptions.LongRunning it will dedicate a Thread to your task instead of stealing one from the thread pool (preventing starvation), and makes sure all works correctly with that setup.


Addendum: By StartNew is dangerous, WBuck means easy to get wrong. Aside from starting a Task, Task.Run will do error handling (which I am telling you to do) and set TaskCreationOptions.DenyChildAttach.

There is also a gotcha in that it does not recognize async methods (no overload takes Func<Task<TResult>> and using Func<TResult> when you pass Func<Task> is a gotcha moment).

Therefore, using an async method in StartNew, directly, is a bad idea. Of course, the naive approach is to wrap the async method in a regular one:

Task.Factory.StartNew
(
    ()=>
    {
        awyncMethod().Wait();
    },
    TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach
);

However, that defies the purpose, because now the async method is in the thread pool, and we created a thread just to wait it.

What happens when we pass an async methods to Factory.StartNew, is that we get a task that represents the creation of the task that you actually want. Let us call it "faketask". This faketask will finish right away, and the task you want is the result... to use it correctly, you would want to add a continuation to the faketask (with the appropriate creation options), that retrieves the actual task. In addition, ideally, you want this continuation to work as a proxy of the actual task (giving you what it returns or what exception it threw). Thankfully TaskExtensions.Unwrap does all that.

Thus, we arrive to this:

Task.Factory.StartNew
(
    async ()=>
    {
        /*...*/
    },
    TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach
).Unwrap();

See also Parallel Programming with .NET - Task.Run vs Task.Factory.StartNew.


If you plan to keep multiple of these, all taking from the same connectionPool... first of all make sure that connectionPool is thread safe. Aside from that, you need a way to monitor the status of the tasks, and relaunch them. Use another long running task, that goes into Task.WaitAny of all of these and handles exceptions - and does logging - and relaunches them as long as the application should be kept alive. That is what I suggest you to use to wait on the tasks.

Another thing, you can use the CancellationToken to exit the loop, check CancellationToken.IsCancellationRequested. And for knowing that the task stopped, you can check Task.IsCompleted. That is why you do not need those variables.

Addendum: In fact, you can use a single CancellationToken tear it all down.

Theraot
  • 31,890
  • 5
  • 57
  • 86
  • Thanks for taking the time with the detail here. I'm going to try making the adjustments you've suggested and see what I can learn from those. – Dave Goodchild Jul 16 '18 at 11:19
  • My `processTask` and `keepRunning` were not marked as volatile, no. I've done that for the moment until I replace with correct use of cancellation tokens. – Dave Goodchild Jul 16 '18 at 11:29
  • I don't believe a method signature exists for `Task.Factory.StartNew(mehtod, TaskCreationOptions.LongRunning, token); `, instead I used `public Task StartNew(Action action, CancellationToken cancellationToken, TaskCreationOptions creationOptions, TaskScheduler scheduler)` – Dave Goodchild Jul 16 '18 at 11:55
  • @DaveGoodchild you are right, I must have messed it up in transcription. Fixed. – Theraot Jul 16 '18 at 11:59
  • 1
    As a side note about this solution and using `LongRunning`. The thread pool will adjust to losing a thread to a long-running operation after 2 seconds. In order to use `LongRunning` you're forced to use the `StartNew` api which can be dangerous. If you are seeing delays due to the thread pool injection rate, then you can carefully add `LongRunning`. In a majority of cases you don't need it. – WBuck Jul 16 '18 at 12:30
  • Since making the changes, I'm now seeing the task.Status changing to "RanToCompletion", which is weird, because I've never seen that before. Not sure how that task can still be running but have a status of "RanToCompletion"? – Dave Goodchild Jul 16 '18 at 14:21
  • Ahh, maybe this has something to do with the fact I'm using an async method. – Dave Goodchild Jul 16 '18 at 14:23
  • I see your update about Unwrap. I'll take another look at StartNew (yesterday I reverted it back to using Task.Run). – Dave Goodchild Jul 17 '18 at 08:16
0

As pointed out in the comments your try-catch is useless. You need to wait for the task to complete in order to see if an exception was thrown.

As a suggestion, instead of running in an endless loop, why not use a timer to periodically poll?

var pollTask = Task.Run(async () => 
 { 
    while (processTask) 
    {
      wToken.ThrowIfCancellationRequested();
      connectionPool.PollEvents();
      await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
    }
  keepRunning = false;
}, wtoken.Token);

try 
{
     pollTask.Wait(wToken);
}
catch( AggregateException ex )
{
    // Handle exception
}

Or, if your method is marked async you can just await the task. Await will unwrap the aggregate exception for you.

try 
{
 await Task.Run(async () => 
 { 
    while (processTask) 
    {
      wToken.ThrowIfCancellationRequested();
      connectionPool.PollEvents();
      await Task.Delay(configLoader.config.connectionPollDelay, wtoken.Token);
    }
  keepRunning = false;
}, wtoken.Token);
}
catch( Exception ex )
{
    // Handle exception
}

And for completeness, you can also inspect the exception property of the task in a continuation.

Task.Run( async ( ) =>
        {
            wToken.ThrowIfCancellationRequested( );
            connectionPool.PollEvents( );
            await Task.Delay( configLoader.config.connectionPollDelay, wToken );
        }, wToken ).ContinueWith( task =>
        {
            if( task.IsFaulted )
            {
                // Inspect the exception property of the task
                // to view the exception / exceptions.
                Console.WriteLine( task.Exception?.InnerException );
            }
        }, wToken );
WBuck
  • 5,162
  • 2
  • 25
  • 36
  • The purpose of my infinite loop is so that I can do other things, such as check the console for a "quit" message or something, so that I can cleanly close down connections etc – Dave Goodchild Jul 16 '18 at 11:26