I am trying to understand some code (for performance reasons) that is processing tasks from a queue. The code is C# .NET Framework 4.8 (And I didn't write this stuff)
I have this code creating a timer that from what I can tell should use a new thread every 10 seconds
_myTimer = new Timer(new TimerCallback(OnTimerGo), null, 0, 10000 );
Inside the onTimerGo it calls DoTask() inside of DoTask() it grabs a task off a queue and then does this
System.Threading.Tasks.Task.Factory.StartNew(ProcessTask, task).ContinueWith(c => DoTask());
My reading of this is that a new thread should start running OnTimerGo every 10 seconds, and that thread should in parralel run ProcessTask on tasks as fast as it can get them from the queue.
I inserted some code to call ThreadPool.GetMaxThreads and ThreadPool.GetAvailableThreads to figure out how many threads were in use. Then I queued up 10,000 things for it to do and let it loose.
I never see more then 4 threads in use at a time. This is running on a c4.4xlarge ec2 instance... so 16 vCPU 30 gb mem. The get max and available return over 2k. So I would expect more threads. By looking at the logging I can see that a total of 50ish different threads (by thread id) end up doing the work over the course of 20 minutes. Since the timer is set to every 10 seconds, I would expect 100 threads to be doing the work (or for it to finish sooner).
Looking at the code, the only time a running thread should stop is if it asks for a task from the queue and doesn't get one. Some other logging shows that there are never more than 2 tasks running in a thread. This is probably because they work is pretty fast. So the threads shouldn't be exiting, and I can even see from the logs that many of them end up doing as many as 500 tasks over the 20 minutes.
so... what am I missing here. Are the ThreadPool.GetMaxThreads and ThreadPool.GetAvailableThreads not accurate if run from inside a thread? Is something shutting down some of the threads while letting others keep going?
EDIT: adding more code
public static void StartScheduler()
{
lock (TimerLock)
{
if (_timerShutdown == false)
{
_myTimer = new Timer(new TimerCallback(OnTimerGo), null, 0, 10 );
const int numberOfSecondsPerMinute = 60;
const int margin = 1;
var pollEventsPerMinute = (numberOfSecondsPerMinute/SystemPreferences.TaskPollingIntervalSeconds);
_numberOfTimerCallsForHeartbeat = pollEventsPerMinute - margin;
}
}
}
private static void OnTimerGo(object state)
{
try
{
_lastTimer = DateTime.UtcNow;
var currentTickCount = Interlocked.Increment(ref _timerCallCount);
if (currentTickCount == _numberOfTimerCallsForHeartbeat)
{
Interlocked.Exchange(ref _timerCallCount, 0);
MonitoringTools.SendHeartbeatMetric(Heartbeat);
}
CheckForTasks();
}
catch (Exception e)
{
Log.Warn("Scheduler: OnTimerGo exception", e);
}
}
public static void CheckForTasks()
{
try
{
if (DoTask())
_lastStart = DateTime.UtcNow;
_lastStartOrCheck = DateTime.UtcNow;
}
catch (Exception e)
{
Log.Error("Unexpected exception checking for tasks", e);
}
}
private static bool DoTask()
{
Func<DataContext, bool> a = db =>
{
var mtid = Thread.CurrentThread.ManagedThreadId;
int totalThreads = Process.GetCurrentProcess().Threads.Count;
int maxWorkerThreads;
int maxPortThreads;
ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxPortThreads);
int AvailableWorkerThreads;
int AvailablePortThreads;
ThreadPool.GetAvailableThreads(out AvailableWorkerThreads, out AvailablePortThreads);
int usedWorkerThreads = maxWorkerThreads - AvailableWorkerThreads;
string usedThreadMessage = $"Thread {mtid}: Threads in Use count: {usedWorkerThreads}";
Log.Info(usedThreadMessage);
var taskTypeAndTasks = GetTaskListTypeAndTasks();
var task = GetNextTask(db, taskTypeAndTasks.Key, taskTypeAndTasks.Value);
if (_timerShutdown)
{
Log.Debug("Task processing stopped.");
return false;
}
if (task == null)
{
Log.DebugFormat("DoTask: Idle in thread {0} ({1} tasks running)", mtid, _processingTaskLock);
return false;
}
Log.DebugFormat("DoTask: starting task {2}:{0} on thread {1}", task.Id, mtid, task.Class);
System.Threading.Tasks.Task.Factory.StartNew(ProcessTask, task).ContinueWith(c => DoTask());
Log.DebugFormat("DoTask: done ({0})", mtid);
return true;
};
return DbExtensions.WithDbWrite(ctx => a(ctx));
}