Hi I have a concurrent Queue that is loaded with files from database. These files are to be processed by parallel Tasks that will dequeue the files. However I run into issues where after some time, I start getting tasks that dequeue the same file at the same time (which leads to "used by another process errors on the file). And I also get more tasks than are supposed to be allocated. I have even seen 8 tasks running at once which should not be happening. The active tasks limit is 5
Rough code:
private void ParseQueuedTDXFiles()
{
while (_signalParseQueuedFilesEvent.WaitOne())
{
Task.Run(() => SetParsersTask());
}
}
The _signalParseQueuedFilesEvent
is set on a timer in a Windows Service
The above function then calls SetParsersTask
. This is why I use a concurrent Dictionary to track how many active tasks there are. And make sure they are below _ActiveTasksLimit
:
private void SetParsersTask()
{
if (_ConcurrentqueuedTdxFilesToParse.Count > 0)
{
if (_activeParserTasksDict.Count < _ActiveTasksLimit) //ConcurrentTask Dictionary Used to control how many Tasks should run
{
int parserCountToStart = _ActiveTasksLimit - _activeParserTasksDict.Count;
Parallel.For(0, parserCountToStart, parserToStart =>
{
lock(_concurrentQueueLock)
Task.Run(() => PrepTdxParser());
});
}
}
}
Which then calls this function which dequeues the Concurrent Queue:
private void PrepTdxParser()
{
TdxFileToProcessData fileToProcess;
lock (_concurrentQueueLock)
_ConcurrentqueuedTdxFilesToParse.TryDequeue(out fileToProcess);
if (!string.IsNullOrEmpty(fileToProcess.TdxFileName))
{
LaunchTDXParser(fileToProcess);
}
}
I even put a lock on _ConcurrentqueuedTdxFilesToParse
even though I know it doesn't need one. All to make sure that I never run into a situation where two Tasks are dequeuing the same file.
This function is where I add and remove Tasks as well as launch the file parser for the dequeued file:
private void LaunchTDXParser(TdxFileToProcessData fileToProcess)
{
string fileName = fileToProcess.TdxFileName;
Task startParserTask = new Task(() => ConfigureAndStartProcess(fileName));
_activeParserTasksDict.TryAdd(fileName, startParserTask);
startParserTask.Start();
Task.WaitAll(startParserTask);
_activeParserTasksDict.TryRemove(fileName, out Task taskToBeRemoved);
}
Can you guys help me understand why I am getting the same file dequeued in two different Tasks? And why I am getting more Tasks than the _ActiveTasksLimit
?