0

I am trying to understand some code (for performance reasons) that is processing tasks from a queue. The code is C# .NET Framework 4.8 (And I didn't write this stuff)

I have this code creating a timer that from what I can tell should use a new thread every 10 seconds

_myTimer = new Timer(new TimerCallback(OnTimerGo), null, 0, 10000 );

Inside the onTimerGo it calls DoTask() inside of DoTask() it grabs a task off a queue and then does this

System.Threading.Tasks.Task.Factory.StartNew(ProcessTask, task).ContinueWith(c => DoTask());

My reading of this is that a new thread should start running OnTimerGo every 10 seconds, and that thread should in parralel run ProcessTask on tasks as fast as it can get them from the queue.

I inserted some code to call ThreadPool.GetMaxThreads and ThreadPool.GetAvailableThreads to figure out how many threads were in use. Then I queued up 10,000 things for it to do and let it loose.

I never see more then 4 threads in use at a time. This is running on a c4.4xlarge ec2 instance... so 16 vCPU 30 gb mem. The get max and available return over 2k. So I would expect more threads. By looking at the logging I can see that a total of 50ish different threads (by thread id) end up doing the work over the course of 20 minutes. Since the timer is set to every 10 seconds, I would expect 100 threads to be doing the work (or for it to finish sooner).

Looking at the code, the only time a running thread should stop is if it asks for a task from the queue and doesn't get one. Some other logging shows that there are never more than 2 tasks running in a thread. This is probably because they work is pretty fast. So the threads shouldn't be exiting, and I can even see from the logs that many of them end up doing as many as 500 tasks over the 20 minutes.

so... what am I missing here. Are the ThreadPool.GetMaxThreads and ThreadPool.GetAvailableThreads not accurate if run from inside a thread? Is something shutting down some of the threads while letting others keep going?

EDIT: adding more code

    public static void StartScheduler()
    {
        lock (TimerLock)
        {
            if (_timerShutdown == false)
            {
                _myTimer = new Timer(new TimerCallback(OnTimerGo), null, 0, 10 );
                const int numberOfSecondsPerMinute = 60;
                const int margin = 1;
                var pollEventsPerMinute = (numberOfSecondsPerMinute/SystemPreferences.TaskPollingIntervalSeconds);
                _numberOfTimerCallsForHeartbeat = pollEventsPerMinute - margin;
            }
        }
    }

    private static void OnTimerGo(object state)
    {
        try
        {                
            _lastTimer = DateTime.UtcNow;
            var currentTickCount = Interlocked.Increment(ref _timerCallCount);
            if (currentTickCount == _numberOfTimerCallsForHeartbeat)
            {
                Interlocked.Exchange(ref _timerCallCount, 0);
                MonitoringTools.SendHeartbeatMetric(Heartbeat);
            }

            CheckForTasks();
        }
        catch (Exception e)
        {
            Log.Warn("Scheduler: OnTimerGo exception", e);
        }
    }

    public static void CheckForTasks()
    {
        try
        {
            if (DoTask())
                _lastStart = DateTime.UtcNow;

            _lastStartOrCheck = DateTime.UtcNow;
        }
        catch (Exception e)
        {
            Log.Error("Unexpected exception checking for tasks", e);
        }
    }

    private static bool DoTask()
    {
        Func<DataContext, bool> a = db =>
        {
            var mtid = Thread.CurrentThread.ManagedThreadId;

            int totalThreads = Process.GetCurrentProcess().Threads.Count;

            int maxWorkerThreads;
            int maxPortThreads;
            ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxPortThreads);

            int AvailableWorkerThreads;
            int AvailablePortThreads;
            ThreadPool.GetAvailableThreads(out AvailableWorkerThreads, out AvailablePortThreads);

            int usedWorkerThreads = maxWorkerThreads - AvailableWorkerThreads;
            string usedThreadMessage = $"Thread {mtid}: Threads in Use count: {usedWorkerThreads}";
            Log.Info(usedThreadMessage);


            var taskTypeAndTasks = GetTaskListTypeAndTasks();
            var task = GetNextTask(db, taskTypeAndTasks.Key, taskTypeAndTasks.Value);

            if (_timerShutdown)
            {
                Log.Debug("Task processing stopped.");
                return false;
            }

            if (task == null)
            {
                Log.DebugFormat("DoTask: Idle in thread {0} ({1} tasks running)", mtid, _processingTaskLock);
                return false;
            }

            Log.DebugFormat("DoTask: starting task {2}:{0} on thread {1}", task.Id, mtid, task.Class);
            System.Threading.Tasks.Task.Factory.StartNew(ProcessTask, task).ContinueWith(c => DoTask());
            Log.DebugFormat("DoTask: done ({0})", mtid);

            return true;
        };
        return DbExtensions.WithDbWrite(ctx => a(ctx));
    }
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Jack-of-some
  • 309
  • 3
  • 12
  • 2
    **Worth noting:** If you're CPU-bound, the optimal number of threads is the same as *the number of processor cores you have.* https://stackoverflow.com/q/1718465. Adding more threads than that will not improve performance. – Robert Harvey Jun 28 '22 at 21:46
  • You could try adding `ThreadPool.SetMinThreads(1000, 1000);` at the start of the program, and see if it makes any difference. This is suggested for gathering information about the causes of the observed behavior. It's not suggested as a fix. I don't recommend adding this code in a production environment. – Theodor Zoulias Jun 28 '22 at 22:28
  • Btw if your project contains a custom class named `Task`, and for this reason you can't import the namespace `System.Threading.Tasks` and you have to use the full name of the [`System.Threading.Tasks.Task`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task) every time you use it, you are in a world of pain. – Theodor Zoulias Jun 28 '22 at 22:38
  • Please show your all your relevant code – Charlieface Jun 28 '22 at 22:55
  • @TheodorZoulias I tried running with that. It was kinda odd. The reported number of used threads still hit 4, but a lot less than before. It was mostly 2. At the same time, the total number of thread ids reported in the log was more like 15 instead of 50. And each ID reported doing more tasks on average than before. Seems like the extra threads all went to the TPL tasks, and not the OnTimerGo threads. oh... and yeah, the codebase is really bad. Old school legacy monolith type thing. Lots of anti-patterns... – Jack-of-some Jun 29 '22 at 00:45
  • @Charlieface more code added. – Jack-of-some Jun 29 '22 at 00:46
  • @RobertHarvey yeah I have seen that. But this box has 16 vCPU's. So shouldn't it go up to at least 16? There is nothing other than the OS on it (windows). – Jack-of-some Jun 29 '22 at 00:46
  • Could you verify that the `ThreadPool.SetMinThreads(1000, 1000);` returned `true`? If the numbers are too high for your system, it might return `false`. In that case try with something smaller, like `ThreadPool.SetMinThreads(500, 500);`. – Theodor Zoulias Jun 29 '22 at 02:37
  • _My reading of this is that a new thread should start running..._ Your reading is wrong. :) Tasks are not Threads. – Wyck Jun 29 '22 at 03:36
  • @TheodorZoulias Verified it returned true – Jack-of-some Jun 29 '22 at 04:08
  • @Wyck Is that true for both the timer and the new task? I knew the new task thing doesn't always start a new thread, but I thought the timer did. – Jack-of-some Jun 29 '22 at 04:09
  • Timers produce **events** that are dispatched to the dispatcher associated with the timer. A `System.Timer` will be dispatched to the dispatcher associated with the thread that created the timer (typically the main thread) and a `System.Threading.Timer` will be dispatched to a thread from the system thread pool. The latter *could* result in a thread being created if the thread pool is dry, but not necessarily (in accordance with how thread pools work). The former will not create threads at all. – Wyck Jun 29 '22 at 14:01
  • @Wyck I assume the threads in the threadpool should be covered by ThreadPool.GetMaxThreads and ThreadPool.GetAvailableThreads. But those show only 2 to 4 threads ever in use at one time. But if those don't count the ones in the "pool" that might make sense. – Jack-of-some Jun 29 '22 at 21:05

2 Answers2

3

The Task.Factory.StartNew by default doesn't create a new thread. It borrows a thread from the ThreadPool instead.

The ThreadPool is intended as a small pool of reusable threads, to help amortize the cost of running frequent and lightweight operations like callbacks, continuations, event handers etc. Depleting the ThreadPool from available workers by scheduling too much work on it, results in a situation that is called saturation or starvation. And as you've already figured out, it's not a happy situation to be.

You can prevent the saturation of the ThreadPool by running your long-running work on dedicated threads instead of ThreadPool threads. This can be done by passing the TaskCreationOptions.LongRunning as argument to the Task.Factory.StartNew:

_ = Task.Factory.StartNew(ProcessTask, task, CancellationToken.None,
        TaskCreationOptions.LongRunning,
        TaskScheduler.Default).ContinueWith(t => DoTask(), CancellationToken.None,
            TaskContinuationOptions.ExecuteSynchronously,
            TaskScheduler.Default);

The above code schedules the ProcessTask(task) on a new thread, and after the invocation is completed either successfully or unsuccessfully, the DoTask will be invoked on the same thread. Finally the thread will be terminated. The discard _ signifies that the continuation Task (the task returned by the ContinueWith) is fire-and-forget. Which, to put it mildly, is architecturally suspicious.

In case you are wondering why I pass the TaskScheduler.Default explicitly as argument to StartNew and ContinueWith, check out this link.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • But what about the timer. Should it be starting a new thread every 10 seconds? – Jack-of-some Jun 29 '22 at 04:10
  • And if I am using all the threads in the pool, shouldn't ThreadPool.GetAvailableThreads be returning a much lower number? Right now I got 2k+ for both available and max. And the difference between them is never more than 4. That implies that the threadpool isn't getting starved. But at the same time, you said it was supposed to be small, and 2k+ doesn't seem small. – Jack-of-some Jun 29 '22 at 04:13
  • @Randell the [`System.Threading.Timer`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.timer) uses threads from the `ThreadPool`. You can confirm it yourself by logging the [`IsThreadPoolThread`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.thread.isthreadpoolthread) property. – Theodor Zoulias Jun 29 '22 at 04:19
  • @Randell I am not familiar with the `GetAvailableThreads` method. I find more useful the [`ThreadPool.ThreadCount`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.threadpool.threadcount) property, that returns the number of threads that are currently owned by the `ThreadPool`, independently of whether they are currently active or idle. – Theodor Zoulias Jun 29 '22 at 04:25
  • 1
    .net framework 4.8 doesn't seem to have the ThreadCount property. So the get methods are all I got. – Jack-of-some Jun 29 '22 at 16:48
  • 1
    @Randell Ah, I missed the *"is C# .NET Framework 4.8"* phrase in the question. I added the `.net-4.8` tag to make it more prominent. – Theodor Zoulias Jun 29 '22 at 17:37
0

My reading of this is that a new thread should start running OnTimerGo every 10 seconds, and that thread should in parralel run ProcessTask on tasks as fast as it can get them from the queue.

Well, that is definitely not what's happening. It's a lot of uncertainty about your code, but it's clear that another DoTask is starting AFTER ProcessTask completes. And that is not parallel execution. Your line of code is this

System.Threading.Tasks.Task.Factory.StartNew(ProcessTask, task).ContinueWith(c => DoTask());

I suggest you to start another DoTask right there like this:

System.Threading.Tasks.Task.Factory.StartNew(ProcessTask, task);
DoTask();

Make sure your code is ready for parallel execution, though.

lilo0
  • 895
  • 9
  • 12
  • The processTask thing has a counter that it increments using interlock. I can see that it gets up to 2 task getting processing at once. But not more than that. Which seems reasonable due to the speed of processtask. So I know it is doing parallel execution of some sort. I read up on the "continuewith" and it basically says it won't wait on process task to finish, and will async run DoTask. That part seems to be working as I have seen logs of individual threads doing hundreds of tasks. It's the timer that doesn't seem to be creating new threads when I think it should. – Jack-of-some Jun 29 '22 at 16:54