-2

Hi I have a concurrent Queue that is loaded with files from database. These files are to be processed by parallel Tasks that will dequeue the files. However I run into issues where after some time, I start getting tasks that dequeue the same file at the same time (which leads to "used by another process errors on the file). And I also get more tasks than are supposed to be allocated. I have even seen 8 tasks running at once which should not be happening. The active tasks limit is 5

Rough code:

private void ParseQueuedTDXFiles()
{
    while (_signalParseQueuedFilesEvent.WaitOne())
    {
        Task.Run(() => SetParsersTask());
    }
}

The _signalParseQueuedFilesEvent is set on a timer in a Windows Service The above function then calls SetParsersTask. This is why I use a concurrent Dictionary to track how many active tasks there are. And make sure they are below _ActiveTasksLimit:

private void SetParsersTask()
{
    
    if (_ConcurrentqueuedTdxFilesToParse.Count > 0)
    {
        if (_activeParserTasksDict.Count < _ActiveTasksLimit) //ConcurrentTask Dictionary Used to control how many Tasks should run
        {
            int parserCountToStart = _ActiveTasksLimit - _activeParserTasksDict.Count;
            Parallel.For(0, parserCountToStart, parserToStart =>
            {
                lock(_concurrentQueueLock)
                    Task.Run(() => PrepTdxParser());
            });
        }
    }

}

Which then calls this function which dequeues the Concurrent Queue:

private void PrepTdxParser()
{
    TdxFileToProcessData fileToProcess;
    lock (_concurrentQueueLock)
        _ConcurrentqueuedTdxFilesToParse.TryDequeue(out  fileToProcess);
    if (!string.IsNullOrEmpty(fileToProcess.TdxFileName))
    {
        LaunchTDXParser(fileToProcess);
    }
}

I even put a lock on _ConcurrentqueuedTdxFilesToParse even though I know it doesn't need one. All to make sure that I never run into a situation where two Tasks are dequeuing the same file.

This function is where I add and remove Tasks as well as launch the file parser for the dequeued file:

private void LaunchTDXParser(TdxFileToProcessData fileToProcess)
{
    string fileName = fileToProcess.TdxFileName;
    Task startParserTask = new Task(() => ConfigureAndStartProcess(fileName));
    _activeParserTasksDict.TryAdd(fileName, startParserTask);
    startParserTask.Start();
    Task.WaitAll(startParserTask);
    _activeParserTasksDict.TryRemove(fileName, out Task taskToBeRemoved);
}

Can you guys help me understand why I am getting the same file dequeued in two different Tasks? And why I am getting more Tasks than the _ActiveTasksLimit?

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Datboydozy
  • 131
  • 7
  • 1
    Please re-read [mre] guidance on posting code and [edit] the question accordingly. I.e. put hardcoded sample values for all collections in the question (make sure issue actually still exists with those values), show types of all involved properties (like `_ConcurrentqueuedTdxFilesToParse`, `_activeParserTasksDict`). Also clarify why you use `WaitAll` and why you mixing `Parallel.For` and async methods (which as you already researched does not really work together and you know that you need to use *Async versions of those). – Alexei Levenkov Oct 10 '22 at 16:57
  • @AlexeiLevenkov I used WaitAll since it means the task I started will be finished before the calling function exists. I am not calling Async Task so to me it made sense to use Paralle.For since I am launching synchronous tasks? I also in my explanation mentioned I am using concurrent dictionary and concurrent queue. To stop my question from being too long, I omitted their declaration as It would again make my question way too big – Datboydozy Oct 10 '22 at 17:03
  • Related question (deleted): [Why are my parallel Tasks running the Same job and Going above allocated Task amount?](https://stackoverflow.com/questions/74016845/why-are-my-parallel-tasks-running-the-same-job-and-going-above-allocated-task-am) – Theodor Zoulias Oct 10 '22 at 17:05
  • @TheodorZoulias Yes I deleted it to write a better edited quesition that is less verbose. – Datboydozy Oct 10 '22 at 17:06
  • I started writing an answer to your previous question, and you deleted it before I had a chance to submit my answer. That's quite disappointing. – Theodor Zoulias Oct 10 '22 at 17:10
  • @TheodorZoulias I am very sad to hear that. I was under the impression my first question was way too verbose and long with unncessary info. Could you please post your solution here. I know S/O is strict with questions so I wanted to do my best to make it easier on readers to see what my issue was. I would really appreciate it if you could post here. I am new to threading and C# so I am stuck. Thanks!! – Datboydozy Oct 10 '22 at 17:13
  • 1
    @TheodorZoulias I do want to apologize again. Please any help you can give would be very appreciated. Even if its some concepts you can point me to. I got worried my quesiton was too much to comprehend by S/O users so i deleted it to make a shorter one – Datboydozy Oct 10 '22 at 17:30
  • 1
    @Datboydozy As mentioned in the first comment, if you provide a [mre] (MRE) it makes it far easier to reproduce and help solve the problem. At a glance there are parts of your code that are not thread safe and perhaps two tasks for the same file are created because of how you create the queue and not how you dequeue from it. An MRE might even help you realize some issues. – Xerillio Oct 10 '22 at 17:41
  • @Xerillio as this questions has been up for some hours now, do I edit it and hope someone looks at it. Or do I delete it aand write a new question with hopefully MRE? It's realy hard to write a proper MRE when you're dealing with threading and I can only get so many downvotes before I am banned. – Datboydozy Oct 10 '22 at 18:29
  • 2
    @Datboydozy It hasn't been 2 hours yet, which is really not a long time. Always [edit] the question to avoid time spent by others going to waste repeating the same comments and in Theodors case losing a possible answer. Deleting and reposting is not much different from cross-posting or posting duplicates either. – Xerillio Oct 10 '22 at 18:40
  • @Xerillio This code runs in a service, how can I create an MRE with a service and threading with the clases I am using without writing an overly long submission? I need to write all code to show exactly what I am dealing with. THen it would be too long. Please help, I am not very knowledgebale with coding. And how to scale such a qestion to S/O – Datboydozy Oct 10 '22 at 18:56
  • Not to mention there are DB components to the service. Where I log my files and where I read and update them from to make it thread safe – Datboydozy Oct 10 '22 at 18:58
  • 1
    @Datboydozy Making an MRE is not only useful when posting a question but equally useful for you to isolate where the problem occurs. Try removing parts/components one at a time (e.g. replace a DB query with a simple hard coded `List`) and test your code until you think you cannot remove more while still reproducing the issue. If you still believe the example is too large for a question here, you can post the most important snippets and give a full example e.g. on [DotNetFiddle](https://dotnetfiddle.net/) or a small GitHub repo. – Xerillio Oct 10 '22 at 19:50
  • @Xerillio thank you so much for the tips I will do better on my next issue. – Datboydozy Oct 10 '22 at 22:55

2 Answers2

1

There is a number of red flags in this¹ code:

  1. Using a WaitHandle. This tool it too primitive. I've never seen a problem solved with WaitHandles, that can't be solved in a simpler way without them.
  2. Launching Task.Run tasks in a fire-and-forget fashion.
  3. Launching a Parallel.For loop without configuring the MaxDegreeOfParallelism. This practically guarantees that the ThreadPool will get saturated.
  4. Protecting a queue (_queuedTdxFilesToParse) with a lock (_concurrentQueueLock) only partially. If the queue is a Queue<T>, you must protect it on each and every operation, otherwise the behavior of the program is undefined. If the queue is a ConcurrentQueue<T>, there is no need to protect it because it is thread-safe by itself.
  5. Calling Task.Factory.StartNew and Task.Start without configuring the scheduler argument.

So I am not surprised that your code is not working as expected. I can't point to a specific error that needs to be fixed. For me the whole approach is dubious, and needs to be reworked/scraped. Some concepts and tools that you might want to research before attempting to rewrite this code:

  1. The producer-consumer pattern.
  2. The BlockingCollection<T> class.
  3. The TPL Dataflow library.

Optionally you could consider familiarizing yourself with asynchronous programming. It can help at reducing the number of threads that your program uses while running, resulting in a more efficient and scalable program. Two powerful asynchronous tools is the Channel<T> class and the Parallel.ForEachAsync API (available from .NET 6 and later).

¹ This answer was intended for a related question that is now deleted.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thank you for writing that out. I am sorry I deleted the original question. I thought I was making the right decisoin to make it shorter and hopefully easier for people to see what my code is doing. – Datboydozy Oct 10 '22 at 19:10
  • WHy would I need to specify max degree of parallelism if I am tracking active tasks with a ConcurrentDictionary? That dictionary is supposed to give real time data on how many taks I have running by adding active tasks and removing them once they are done – Datboydozy Oct 10 '22 at 19:11
  • 1
    @Datboydozy the default `MaxDegreeOfParallelism` is -1, which means unlimited parallelism. This means that the `Parallel.For` will use all the threads that are currently available in the `ThreadPool`, and will ask for more. If the `ThreadPool` has 4 threads, the parallelism will be 4. If it has 100, the parallelism will be 100. If you have two unconfigured `Parallel` loops running concurrently, they will fight with each other. This is unlikely to be the behavior that you want. Surprisingly Microsoft [still believes](https://github.com/dotnet/runtime/issues/72981) that the default (-1) is OK. – Theodor Zoulias Oct 10 '22 at 19:28
  • So even though I am tracking active tasks through a concurrent dictionary and using that dictionary's count of tasks to limit how many files I enqueue, I will still get more threads than I wanted because of this? – Datboydozy Oct 10 '22 at 20:14
  • I have looked but haven't found a good answer yet but how is the Parallel.ForEachAsync better than normal Paralle.For? – Datboydozy Oct 10 '22 at 20:22
  • 1
    @Datboydozy the `Parallel.For(0, parserCountToStart)` will do a parallel loop between `0` and `parserCountToStart`, using all the available threads in the `ThreadPool`. If the `parserCountToStart` is larger than the available threads, the `ThreadPool` will become saturated. All these threads will execute concurrently the code that you have inside the loop. If this code includes internal synchronization/throttling, then many/most of these threads will be blocked. With the `ThreadPool` saturated, any other work that requires a thread from the `ThreadPool`, like the `Task.Run`, will stagnate. – Theodor Zoulias Oct 10 '22 at 22:30
  • 1
    @Datboydozy the `Parallel.ForEachAsync` parallelizes asynchronous actions, and when an operation is asynchronous then for the most part [it's not using any thread](https://blog.stephencleary.com/2013/11/there-is-no-thread.html). So you can have hundreds of asynchronous operations running concurrently on a handful of threads. That's the benefit of going async. – Theodor Zoulias Oct 10 '22 at 22:34
  • I couldn't edit my question but I meant to ask "the Parallel.ForEachAsync diifferent from a normal Paralle.For? But maybe you answered it just now. I am very new to programming and C#. Is there a place I can go to read up the difference between Asyncrhonous and Synchronous and how it relates to Tasks and AsyncTasks and Parallel fors? The more I've looked at questions, I feel like I've become less sure about what sync and async are anymore. And your answers here almost make it seem like it's always better to stick with Async methods as to not use up threads? You given me a lot question haha – Datboydozy Oct 10 '22 at 23:01
  • 1
    @Datboydozy asynchronous usually means "without using the current thread" or "without using any threads". I would say that it's not a very well defined term. Some people use it interchangeably with the terms parallel and concurrent. You can find good readings about asynchronous programming in [Stephen Cleary's blog](https://blog.stephencleary.com/2012/02/async-and-await.html). You could also consider learning multithreading systematically, with either a book or online material like this: [Threading in C#](http://www.albahari.com/threading/) by Joseph Albahari. – Theodor Zoulias Oct 10 '22 at 23:14
  • Got it thanks. I got a "solution" posted but on each bulk run I do where I try to queue up 1000 files, I still get one file that gets queued up across two tasks. Before this solution this was happening a lot. Like at least 20 to 30 times each bulk run. Now it only happens 1 or 2 times each bulk run. Can you check my answer and see if there is something I'm missing? I feel like putting the Task in the normal for loop should be enough. But it's not? – Datboydozy Oct 11 '22 at 00:11
  • @Datboydozy to be honest I don't think that your current code is salvageable. Rewriting it correctly from scratch could be easier, than trying to fix multithreaded code that was written without deep understanding of the concepts. I'm not trying to disappoint or offend you. I would do the same with my code. I have written terrible multithreaded code in the past, sprinkling `lock`s here and there without knowing what I was doing. It was working correctly most of the time, until it didn't, in which case nobody could explain the weird exceptions that were popping at the most unfortunate moments. – Theodor Zoulias Oct 11 '22 at 01:04
  • Got it. So I I took out the parallel for in my code so its just tasks now inside a normal for loop. The only other weird thing I've seen is that some of the processes do get stuck after they are launched by the Task. Is this related to my multi threading code or is it more on the fact that I am launching Process items from Tasks? For the record I briefly looked at the producer consumer thing/blocking collections and it seems more complicated than simple Tasks? I actually thought I was doing the right thing keeping multithreading simple with Tasks and loops – Datboydozy Oct 11 '22 at 10:14
  • 1
    @Datboydozy tasks are simple, but when you have many of them it's not so simple to coordinate them. The `BlockingCollection` is not comparable to a `Task`. It is a queue with blocking capabilities that can be used for propagating messages from a producer `Task` to a consumer `Task`. You can think of it as a `ConcurentQueue` with extra capabilities. The consumer `Task` can simply enumerate the `GetConsumingEnumerable()`, without having to worry if the collection is temporarily empty. While it is empty, the enumeration blocks. It unblocks automatically when a new message is available. – Theodor Zoulias Oct 11 '22 at 11:03
  • Thanks againg. If I use a blocking collection how would that be different from using a concurrent Queue then if the idea behind them is about the same. I still need to run multiple tasks/threads to read from the collection and make sure that they are being used uniquely. And also need to controll how many of them I am using at once. Essentially the base structure of what I need is A collection that I can add files to. Then Tasks that can launch and take files from that collection in a thread safe unique manner. Thats why I saw a simple concurrent queue dequed by parallel threads sufficient? – Datboydozy Oct 11 '22 at 12:51
  • 1
    @Datboydozy the `BlockingCollection` is just a conveyor belt, not a processor. If you are looking for a queue that also processes the messages it receives, take a look at the [`ActionBlock`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1) component from the TPL Dataflow library. It can be configured with `MaxDegreeOfParallelism` (the default is 1), and with `BoundedCapacity` (the default is unlimited) to force the producer to wait when the queue is full. And it has a `Completion` property so that you know when everything is done. – Theodor Zoulias Oct 11 '22 at 17:06
  • Thanks again. Going back to your answer, for point 1. You said you haven't seen a problem that can't be solved by something other than WaitHandle. I use it to ensure that the functions enclosed inside said handle don't get to run again unless the tasks inside said handle are done. What else could I use besides some sort of signal to ensure a set of tasks are done before it's queued up to run again? – Datboydozy Oct 12 '22 at 17:17
  • 1
    @Datboydozy if you use the aforementioned higher-level tools, you don't have to handle the signaling manually because they are doing it internally for you. Also if you want to start tasks sequentially the one after the other, you can keep a reference to the previous task, and the next task can `await` the previous one before doing its main work. You can do a lot of cool things if you keep track of your tasks, instead of firing-and-forgetting them. – Theodor Zoulias Oct 12 '22 at 17:49
0

So I fixed my problem. The solution was first to not add more parallelism than needs be. I was trying to create a situaion where private void SetParsersTask() would not be held by tasks that still needed to finish process a file. So I foolishly threw in Parallel.For in addition to Task.Start which is already parallel. I fixed this by generating Fire and Forget Tasks in a normal for loop as opposed to Paralle.For:

private void SetParsersTask()
{
    if (_queuedTdxFilesToParse.Count > 0)
    {
        if (_activeParserTasksDict.Count < _tdxParsersInstanceCount)
        {
            int parserCountToStart = _tdxParsersInstanceCount - _activeParserTasksDict.Count;
            _queuedTdxFilesToParse = new ConcurrentQueue<TdxFileToProcessData>(_queuedTdxFilesToParse.Distinct());
            for (int i = 0; i < parserCountToStart; i++)
            {
                Task.Run(() => PrepTdxParser());
            }
            
        }
    }

}

After that I was still getting the occasional duplicate files so I moved the queue loading to another long running thread. And for that thread I use an AutoResetEvent so that the queue is only populated only once at any instance of time. As opposed to potentially another task loading it with duplicate files. It could be that both my enqueue and dequeue were both responsible and now it's addressed:

var _loadQueueTask = Task.Factory.StartNew(() => LoadQueue(), TaskCreationOptions.LongRunning);

private void LoadQueue()
{
    while (_loadConcurrentQueueEvent.WaitOne())
    {
        if (_queuedTdxFilesToParse.Count < _tdxParsersInstanceCount)
        {
            int numFilesToGet = _tdxParsersInstanceCount - _activeParserTasksDict.Count;
            var filesToAdd = ServiceDBHelper.GetTdxFilesToEnqueueForProcessingFromDB(numFilesToGet);
            foreach (var fileToProc in filesToAdd)
            {
                ServiceDBHelper.UpdateTdxFileToProcessStatusAndUpdateDateTime(fileToProc.TdxFileName, 1, DateTime.Now);
                _queuedTdxFilesToParse.Enqueue(fileToProc);
            }

        }
    }
}

Thanks to Theo for pointing me to additional tools and making me look closer in my parallel loops

Datboydozy
  • 131
  • 7
  • 1
    You shouldn't be calling `Task.Run(() => PrepTdxParser());` without hanging on to a reference to the task to later ensure it is completed. That's the "fire-and-forget" Theo talked about. – Enigmativity Oct 11 '22 at 01:10
  • @Enigmativity I added my PrepTdxParser function. Inside it, I wait for the actual parser task to finish. Should I move the wait inside the forloop? It would just prevent the forloop from moving to another Task. So thats why I did a fire and forget – Datboydozy Oct 11 '22 at 10:19
  • It's now been fixed. Needed to also move my enqueing function to another thread and have it only enqueued once at any instance instead of multple times at the same time which causes possibility of duplicates @Enigmativity – Datboydozy Oct 11 '22 at 15:15