-1

Hi I am a student intern with little to no C# experience who got put into a situation to take over a windows service that uses TaskCompletionSource and BlockingCollection to implement multithreading. I have never done C#. I am trying to optimize how the service handles its task which is to crunch log files.

My question is, using BlockingCollection to create a thread queue which executes WorkiItem, how can you get the count of active threads in the queue? Meaning how many items that have been invoked by the EnqueueTask() command are still in running state? I don't want the count of the queue backlog which is what _taskQ.Count returns. I want count of active threads. I want to keep the thread count at four and only Enqueue an Item once a previous item is done. I don't want a blacklog of items in my queue.

public class ProducerConsumerQueue
{
    public CancellationTokenSource Token { get; set; }
    private BlockingCollection<WorkItem> _taskQ;

    public ProducerConsumerQueue(int workerCount)
    {
        _taskQ = new BlockingCollection<WorkItem>();
        for(int i = 0; i <workerCount; i++)
        {
            Task.Factory.StartNew(Consume);
        }
    }

    
    public Task EnqueueTask(Action action, CancellationToken? cancelToken)
    {
        var tcs = new TaskCompletionSource<object>();
        _taskQ.Add(new WorkItem(tcs, action, cancelToken));
        return tcs.Task;
    }

    public void Consume()
    {
        foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
        {
            if (workItem.CancelToken.HasValue &&
                workItem.CancelToken.Value.IsCancellationRequested)
            {
                workItem.TaskSource.SetCanceled();
            }
            else
            {
                try
                {
                    workItem.Action();
                    workItem.TaskSource.SetResult(null);
                }
                catch (OperationCanceledException ex)
                {
                    if (ex.CancellationToken == workItem.CancelToken)
                    {
                        workItem.TaskSource.SetCanceled();
                    }
                    else
                    {
                        workItem.TaskSource.SetException(ex);
                    }
                }
                catch (Exception ex)
                {
                    workItem.TaskSource.SetException(ex);
                }
            }
        }
    }
}

This ProducerCOnsumer queue is called at service start and its queue is reloaded at each service polling interval. I want to limit how many of these threads are spawned by setting it to the limit of files that are in a thread safe db table. So if thread count is 4, the number of files in the db table will be 4. The queue shouldn't spawn additional threads or enqueue files into it's queue until 1 file is done. To do this, I figured a simple solution would be to count the number of active threads (which would mean active amount of files crunching and don't add any new files until the thread goes down by 1:

protected override void OnStart(string[] args)
{
    ProducerConsumerQueue = new ProducerConsumerQueue(Constants.THREAD_COUNT);
    InitializeLogging();
    PollOnServiceStart();

    _timer.Elapsed += OnElapsedTime;
    _timer.Enabled = true;
    _timer.Interval = _interval;
}

public void OnElapsedTime(object source, ElapsedEventArgs args)
{
    try
    {
        //InitializeLogging();
        Poll();
    }
    catch (Exception ex)
    {
        Logger.Error(ex.Message.ToString());
    }
}


public void PollOnServiceStart()
{
    foreach (var handler in handlers)
    {
        ProducerConsumerQueue.EnqueueTask(handler.Execute, CancellationTokenSource.Token);
    }
}
edo101
  • 629
  • 6
  • 17
  • You're not clear about what you're using to create concurrency, never mind what you've actually tried and what _specifically_ you need help with, but see duplicates for various techniques on limiting the number of active threads – Peter Duniho Feb 05 '21 at 23:10
  • @PeterDuniho what exactly isn't clear to you. Did you not see the for loop where I use a for loop to generate parallelism? – edo101 Feb 05 '21 at 23:25
  • 1
    While you say you "want to get the active number of threads", it's clear from what you wrote, and especially in your recent edit, that what you _really_ want is to limit the number of active threads. See [XY Problem](https://meta.stackexchange.com/questions/66377/what-is-the-xy-problem). – Peter Duniho Feb 05 '21 at 23:31
  • Hi edo101. Could you edit the question and specify what you are asking for in the body of the question? Writing it only in the title is not enough IMHO. Also the title is not the right place to clarify what you are *not* asking for! – Theodor Zoulias Feb 05 '21 at 23:54
  • Oh, and while you're right that the call to `Task.Factory.StartNew()` is there after all (I overlooked it), all that does is reinforce the relevance of the duplicates. – Peter Duniho Feb 06 '21 at 00:04
  • @PeterDuniho Hmm I dont get what you are saying about the duplicates. Btw, this is an application I inherited. I don't know much about C#. I'm just trying to optimize what already exists – edo101 Feb 06 '21 at 00:26
  • 1
    Your post reads: _"I want count of active threads"_. Ignoring for a moment the question of why you should care about that, how is the _"count of active threads"_, not the same as the `workerCount` value that is passed to the `ProducerConsumerQueue` constructor? And in spite of your protestations to the contrary, your question still includes statements like _"I want to limit how many of these threads are spawned"_ which clearly indicate that you _do_ want to limit the number of active threads, and that the duplicates _are_ in fact what you need to read. – Peter Duniho Feb 06 '21 at 00:35
  • @PeterDuniho Workeer count returns the count of threads in the queue not current active working threads. THis is why I wanted you guys to ask instead of closing and downvoting. Thanks for asking. Again I didn;t write this code and I am trying to avoid changing too much that would impact production – edo101 Feb 06 '21 at 00:39
  • How is a thread sitting there waiting for something to show up in the queue not an "active thread"? It's still a runnable thread, using up thread resources (like stack). Why doesn't the information found in the hundreds, if not thousands, of related questions, including the duplicates that seem to address the underlying goal directly, not address your question? – Peter Duniho Feb 06 '21 at 00:40
  • 1
    When you say that you want the count of active threads in the queue, you mean the number of the `Action`s that have been invoked by the `workItem.Action();` command, and are still in running state? – Theodor Zoulias Feb 06 '21 at 00:42
  • @TheodorZoulias PRECICESLY! – edo101 Feb 06 '21 at 00:44
  • @TheodorZoulias YES !! I want the number of actions that are still in running state! – edo101 Feb 06 '21 at 00:45
  • Would `Parallel.ForEach` do what you need? If not, what requirement does it not fulfill? _If you aren't sure, please read the docs._ – mjwills Feb 06 '21 at 00:48
  • edo101 I would suggest to add this clarification in the body of the question, because it can help people to understand better what you want to know. Cleaning up the title would also be nice, because that shouting capitalized phrase serves as a downvote-magnet IMHO. Btw it seems to me that Slugart's [answer](https://stackoverflow.com/a/66071146/11178549) is to the point, and if you can't take advantage of it then honestly you have a tough road ahead, because multithreading is difficult even for experienced C# programmers. – Theodor Zoulias Feb 06 '21 at 00:55
  • edo101 yeap, I understand your situation, but I can't see how asking questions here on StackOverflow will help you move forward with your goals. You'll have to invest a lot of time writing clear questions, after researching that no duplicates exist. And your questions my still be closed against your expectations, and then you'll have to fight battles to reopen them, and so on. Maybe a better investment of your time would be to study some well structured resource about multithreading, like Joseph Albahari's [online book](http://www.albahari.com/threading/ "Threading in C#"). – Theodor Zoulias Feb 06 '21 at 01:05
  • 2
    `I will use that number and set it in an if condition where if the count of active actions is less than thread count constant, then add an amount of file that is equal to the difference between the numbner of active actions and thread count` So you are manually trying to limit the number of threads? Don't do that. Use `Parallel.ForEach` instead. Set `MaxDegreesOfParallelism` to the number of threads you want to limit to. – mjwills Feb 06 '21 at 01:06
  • 1
    You are frustrated, I get that. But I suggested you look at that _over an hour ago_. So you can see how we might be a little frustrated also. `If nothing else it looks like I have some reading to do as well. ` Yes. Note also that that suggestion is **directly covered in the duplicate that you claim isn't relevant**. – mjwills Feb 06 '21 at 01:11
  • No problem I will read up on it. I assume this method means rewriting a lot of threading logic? If this is the case... does this parallel foreach ensure that a limit of 4 threads means only 4 active tasks in any situation? And if so, how does a task know when to start a new job (AKA: pick up a new file to process). The current design uses a meta table to track which files are being crunched. Anyways I will be reading up on it. I just wonder if it will be able to work how I need it to work @mjwills – edo101 Feb 06 '21 at 01:16
  • 1
    `And if so, how does a task know when to start a new job` When one finishes, it starts a new one. It is similar to a normal `foreach` loop, but with multiple iterations happening in parallel. – mjwills Feb 06 '21 at 01:20
  • 1
    `Parallel.ForEach` has some quirks with `BlockingCollection` so I have added some duplicate links for those as well. Also read https://devblogs.microsoft.com/pfxteam/parallelextensionsextras-tour-4-blockingcollectionextensions/ . – mjwills Feb 06 '21 at 01:32
  • @mjwills Where did you add these duplicate links? – edo101 Feb 06 '21 at 03:33
  • They should be up the top. – mjwills Feb 06 '21 at 10:08
  • @mjwills I had to break from this task. Now I'm back. I should mention again I am new to C# and I am not even sure with my brief reading if parallel foreach works for non console applications. Part of the issue is the way the service is designed, we need it to get a new file after it processes a file. I suppose I need to rewrite the way in which we get new files – edo101 Feb 19 '21 at 01:16

1 Answers1

2

If you want a thread-safe counter to check the number of active tasks you can use an int _counter field and Interlocked.Increment(ref _counter)/Interlocked.Decrement(ref _counter).

Remember to increment as the first line in a try block decrement in a finally block so that you don't lose any calls to either if an exception is raised.

https://learn.microsoft.com/en-us/dotnet/api/system.threading.interlocked.increment?view=net-5.0

https://learn.microsoft.com/en-us/dotnet/api/system.threading.interlocked.decrement?view=net-5.0

Slugart
  • 4,535
  • 24
  • 32
  • Oh God I don't know anything about this. I'll have to read into ti. Why would I increment and decrement? – edo101 Feb 05 '21 at 22:31
  • Increment when you start processing a task, decrement when you finish. The count at any point in time will be the number of active tasks. That's what you ask for in the question. – Slugart Feb 05 '21 at 22:33
  • This will go in this class yeah? A new property for the class? – edo101 Feb 05 '21 at 23:05
  • 1
    A field of the class, yes. – Slugart Feb 06 '21 at 07:14
  • I figured it out! Thanks for your help. I am now testing it to see how it works. Your idea if it works after thorough testing makes my life much easier as I dont have to rewrite the threading logic! – edo101 Feb 18 '21 at 06:06
  • Update, this method works 99% of time! I suppose due to asynchrosity of threads every now and then, I get 1 files added to the processing queue. By that I mean before one thread can dispose completely of the task, another task tries to start. I suppose I need to do a little delay after each check to give a thread a little time to check the active threads counts again before it tries starting a new task. My quesiton now is, why did we use this interlocked increment vs a nomal variable that you'd increment. such as int counter = 0, counter+=1? – edo101 Feb 18 '21 at 13:01
  • Interlocked.Increment is used as the counter will be read/updated by multiple threads so the operation need to be thread safe. counter += 1 is not thread safe. – Slugart Feb 19 '21 at 07:14
  • Thanks for replying. Yeah unfortunately, your nice solution breaks because of the service threading. So there are two versions of the service. One where the service only looks up files from a DB table and loads them. This is more straightforward and this solutions works all the time. The second version is where the service has to look for files through 3 giant folders in a network cluster. This takes time to do. Something about doing that throws off the active tasks counts and I end up queing up more files than I should: "if (ProducerConsumerQueue.ActiveTasksCount() < Constants.THREAD_COUNT)" – edo101 Feb 19 '21 at 21:18
  • That if condition works for the first straightforward version of the service. But doesn't work for the second for what I suppose is mistimings between the threads caused by the lag of looking for new files in the netapps cluster, so I end up with files queued up beyond the 4 thread amount – edo101 Feb 19 '21 at 21:19
  • Update, it now works. I needed to edit the original developers man service thread. IT was very inefficient but your logic came in clutch. Thank you very much! – edo101 Mar 05 '21 at 05:09