0

.NET Framework 4.5.2. I'm trying to update some old code and trying to figure out how to manage the number of running threads for a specific ID. This is the code.

foreach (ThreadGroup thread_group in threadGroups)
{
    if (ImportQueues.TryGetValue(thread_group.ThreadGroupID, out BlockingCollection<ImportFrequency> configuration))
    {
        if (configuration.Count > thread_group.ThreadCount)
        {
            // increase number of threads running for this group
        }
        else if (configuration.Count < thread_group.ThreadCount)
        {
            // decrease number of threads running for this group
        }
    }
    else // Spin up the initial threads
    {
        ImportQueues.Add(thread_group.ThreadGroupID, new BlockingCollection<ImportFrequency>());

        for (int x = 0; x < thread_group.ThreadCount; x++)
        {
            Thread import_thread = new Thread(new ParameterizedThreadStart(ProcessImportQueue)) { IsBackground = true };
            import_thread.Start((ImportQueues[thread_group.ThreadGroupID]));
        }
    }
}

Essentially, we're running thread_group.ThreadCount number of threads for each group's blocking collection, which will be continually updated. Additionally, thread_group can be updated to change the ThreadCount. If the change increases the number of threads, spin up more to process the blocking collection. If it decreases, wait for the difference in number of threads to finish while the others continue running.

Is this possible with this paradigm or do I need to find a better way to manage threads?

Edit: One solution I tried with this is using CancellationTokens. When I start up a thread, I pass in a model that contains the context as well as a CancellationToken. That model is saved to a global variable. If we need to decrease the number of threads, I go through however many need to be stopped and cancel the token, which stops the infinite loop for that one thread and stops it.

Carspn
  • 3
  • 3
  • 2
    The general recommendation is to use the thread pool rather than starting threads manually. Have you evaluated your solution to confirm that it actually brings any improvement over a simpler solution? In my experience, there is usually room to do things more efficiently rather than throwing threads at the problem, – JonasH Mar 08 '22 at 21:30
  • Thanks for the response. No, I haven't looked into it yet, I don't have a ton of flexibility from upper management to overhaul how this is working. I'll look into thread pools and see if that's a good solution for this. – Carspn Mar 08 '22 at 21:37
  • 1
    You've posted code that is the current solution to a problem that you haven't described. We can suggest to use a more idiomatic way to do this (i.e. threadpool) but without knowing what the actual problem this code solves we can't easily give you a good concrete solution. – Enigmativity Mar 08 '22 at 22:53
  • 1
    Let me put it another way. There was a problem X, and this solution Y was implemented. You've come here and clearly articulated what Y is and what it does. You've asked us is there a better way to do Y. It's very likely that there is a better way to do X. It would be great if you could add the details for X and let us solve that. I suspect that "upper management" will be blown away with your ingenuity and they would be grateful for the change. – Enigmativity Mar 08 '22 at 22:58
  • Possibly related: [Is it possible to change parallelOptions.MaxDegreeOfParallelism during execution of a Parallel.ForEach](https://stackoverflow.com/questions/3705006/is-it-possible-to-change-paralleloptions-maxdegreeofparallelism-during-execution) / [How to dynamically control Parallelism Degree based on object sizes (or some system constraints)?](https://stackoverflow.com/questions/68744296/how-to-dynamically-control-parallelism-degree-based-on-object-sizes-or-some-sys) / [Dynamically change TPL Dataflow Block .MaxDegreeOfParallelerism](https://stackoverflow.com/questions/22441338) – Theodor Zoulias Mar 09 '22 at 01:47
  • Sorry for not describing the context, I was too focused on clearly explaining the problem. The context: We have 7 groups responsible for processing different types of data models. The models are the same type but do different things. The way it's currently designed, on service start up we retrieve the thread groups, which contains the ID and number of threads to spin up. We then spin off the specified number of threads that infinitely run a method with that group's context, which will pop models off of a BlockingCollection. – Carspn Mar 09 '22 at 21:09
  • 1
    @Carspn - Can you please describe what you're trying to process without mentioning threads? I'd like to know what the business problem is, not the solution you've crafted for it. I want to understand what you're trying to do so that so that I can decide if threads, tasks, or some other solution is best. – Enigmativity Mar 09 '22 at 21:20
  • So you have 7 groups and multiple models and you need to run a method that never ends on each model? Or are you executing a tight loop on each model, calling a method in each iteration of the loop? I assume the models are fixed in number otherwise you're going to run out of memory quickly. – Enigmativity Mar 09 '22 at 21:24
  • Apologies. I appreciate you taking the time to talk through this. The method as a whole is an infinitely loop that takes items from a blocking collection. Item is used to make an API request to a separate service and then save relevant timings (how long the request took, when it finished, etc.). It also handled any response failures from the request. – Carspn Mar 09 '22 at 21:28
  • Sorry for the confusion, the key thing here is the blocking collection. Inside the method that a thread spawns, it's `while (true) var x = collection.Take();` That collection is added to by a different part of the program, separate from anything in this question – Carspn Mar 09 '22 at 21:30
  • Carspn could you include these clarifications / higher level description of the problem, inside the question? A question should include all the information required in order to be answered. Reading the comments should not be a prerequisite for answering a question. – Theodor Zoulias Mar 09 '22 at 21:58
  • 1
    @Carspn - Thank you. We're getting closer. So the `while (true) var x = collection.Take();` pattern is a bit nasty. You're holding up a thread that's likely to not be doing any work. Threads hold ***at least 1MB of memory*** just to exist. You want to make them work all of the time (i.e. not waiting around). If they don't have work they should be let go of. I think there is plenty of scope here in making your code work much faster without the need of any of this threading code. Can you show the full code? – Enigmativity Mar 09 '22 at 22:09
  • I wrote up a simple fiddle that mimics what we're doing. It contains the key parts of the program. https://dotnetfiddle.net/cw7N7F – Carspn Mar 10 '22 at 23:23
  • @Carspn - You might as well mine Bitcoin with code like `while (true) { }`. It's going to max out your CPU. – Enigmativity Mar 11 '22 at 02:16
  • Sorry, I just used it for this example, that site times out at 10 seconds anyway. We're using a Windows Service, and that main is part of the OnStart method. – Carspn Mar 11 '22 at 04:58
  • @Carspn - How is it different in the real code? – Enigmativity Mar 11 '22 at 05:11

1 Answers1

1

As I understand the example code, it creates N queues, with M threads processing each queue.

There are a few potential problems with this. I say potential since there might be some special circumstances that motivates such a solution, but Sturgeon's law suggest it might just be a misguided attempt to achieve some unclear goal.

  1. It is likely that N*M is larger than the amount of hardware threads available, either resulting in having idle threads that consume resources, or having unnecessary context switching if all threads are loaded.
  2. It is unclear if the queues are in any way different. If they are identical, why not just use a single queue?
  3. While adding threads is fairly simple, decreasing threads can be a bit complicated. Threads should only be canceled cooperatively, so you need some way to signal that the thread should be cancelled. But if the thread is blocked waiting for items in the queue you might run into situations where you have asked threads to stop, but they have not yet done so, and you are starting threads faster than threads are actually freed, eventually running out of threads. This can probably be solved, but it is unclear if the current solution does this.
  4. 'Import' sounds like something involving IO, and IO usually does not scale well with the number of threads. In the worst case, parallel IO can even harm performance.
  5. It is unclear what the actual processing is doing. In my experience, a cpu core can do an incredible amount of work if properly utilized. When things are 'slow' it usually means that the code is doing some kind of unnecessary work. Some junior programmers approach performance problems by attempting multi-threading as the first and last step. When profiling the code and improving efficiency can often give much greater improvements.

For a simple case of a queue that needs to be processed in parallel I would consider using a parallel.Foreach loop over a blocking collection, using the parallelExtensionsExtras for a better partitioner. That should automatically try to balance the number of threads used to available hardware and CPU usage patterns for maximum throughput.

But I would not touch working code without a good understanding what it is trying to do, and why it should be changed to something better. Any preferably only after sufficient automated testing is in place to help avoid introducing new bugs. Changing code just because it is old is a terrible idea.

JonasH
  • 28,608
  • 2
  • 10
  • 23