1

I have 6 million small (average size about 15 bytes) files that I need to read and further process using the processor. I have previously implemented this using Task.Factory and it worked on asp.net core 2.1 without problems. It took about 20 hours.

I have now migrated the app to asp.net 6 and on the test server, my web application stops responding to any requests after starting these file operations. In the log I see the error System.OutOfMemoryException.

I suppose my way of implementation is far from ideal. I would like to know what ways of multi-threaded implementation of this work can you suggest?

Method ImportSignatures from controller:

[HttpPost("ImportSignatures")]
public JsonResult ImportSignatures()
{
    try
    {
        return Json(SignatureImportService.ImportSigningCerts());
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
        return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
    }
}

Method ImportSigningCerts:

public static ImportSigningCertsResult ImportSigningCerts()
{
    LogsHelper.WriteEventLog("Launching SignatureImportService");
    WasCancelled = false;
    IsWorking = true;
    ResultStr = "";
    totalSignatures = 0;
    processedSignatures = 0;

    var cancelMsg = "Certificate import was interrupted. \n";
    var endMsg = "Certificate import completed successfully. \n";
    var toDelete = new List<string>();

    try
    {
        var configuration = SignatureImportConfiguration.FromCfg();

        using (s_tokenSource = new CancellationTokenSource())
        {
            List<string> signatures = Directory.EnumerateFiles(configuration.Path, "*.sig").ToList();
            totalSignatures = signatures.Count;

            Store mainStore = StoreMan.GetStore("Main");
            var importStats = new ImportStats();
            var tasks = new List<Task>();

            int saveIndex = 1;
            const int proccessedForSave = 100000; // After what number of processed signatures to perform intermediate storage and deletion of signatures
            CancellationToken token = s_tokenSource.Token;

            int minWorkerThreads, minCompletionPortThreads, maxWorkerThreads, maxCompletionPortThreads;
            ThreadPool.GetMinThreads(out minWorkerThreads, out minCompletionPortThreads);
            ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads);
            ThreadPool.SetMaxThreads(minWorkerThreads * 2, maxCompletionPortThreads);

            signatures.ForEach(path =>
            {
                tasks.Add(Task.Factory.StartNew(() =>
                {
                    token.ThrowIfCancellationRequested();

                    // Here reading of a current file and uploading the necessary certificates to the store from it
                    if (UploadSigningCerts(mainStore, path, importStats))
                    {
                        if (configuration.NeedCleaning)
                        {
                            lock (s_toDeleteListLockObj)
                                toDelete.Add(path);
                        }
                    }

                    // Here intermediate store's saving and deleting proccessed files
                    lock (s_intermediateSaveLockObj)
                    {
                        if (++processedSignatures > proccessedForSave * saveIndex)
                        {
                            LogsHelper.WriteEventLog("Intermediate saving of the certificate store...");

                            mainStore.WriteIfChanged();
                            StartRemovingSignatures(toDelete);
                            saveIndex++;
                        }
                    }
                }, token));
            });

            try
            {
                Task.WaitAll(tasks.ToArray());
            }
            catch (AggregateException ae)
            {
                foreach (Exception e in ae.InnerExceptions)
                {
                    if (e is not TaskCanceledException)
                        LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts", e);
                }
            }
            mainStore.WriteIfChanged();
            StartRemovingSignatures(toDelete);
            ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
        }

        LogsHelper.WriteEventLog(ResultStr);
        return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
    }
    catch (Exception)
    {
        throw;
    }
    finally
    {
        IsWorking = false;
    }
}

Method UploadSigningCerts:

private static bool UploadSigningCerts(Store store, string path, ImportStats importStats)
{
    bool toBeDeleted = true;
    CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;

    try
    {
        List<CertInfo> certs = client.GetSignCmsInfo(File.ReadAllBytes(path)).Certs.ToList();

        Interlocked.Add(ref importStats.all, certs.Count);

        for (int i = 0; i < certs.Count; i++)
        {
            lock (s_importLockObj)
            {
                // Validating each certificate from a file, making an import decision, importing to the store...
            }
        }
        return toBeDeleted;
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("SignatureImportService/UploadSigningCerts", e);
        LogsHelper.WriteEventLog($"Error importing certificate from signature: {Path.GetFileName(path)};");
        Interlocked.Increment(ref importStats.errors);
        return false;
    }
}

Method StartRemovingSignatures:

private static void StartRemovingSignatures(List<string> toDelete)
{
    if (toDelete.Count > 0)
    {
        List<string> tempToDelete;
        lock (s_toDeleteListLockObj)
        {
            tempToDelete = new List<string>(toDelete);
            toDelete.Clear();
        }

        LogsHelper.WriteEventLog("Deleting successfully processed signature files...");

        Task.Factory.StartNew(() =>
        {
            tempToDelete.ForEach(path =>
            {
                try
                {
                    File.Delete(path);
                }
                catch (Exception e)
                {
                    LogsHelper.WriteLog("ImportResult/DeleteSignatures", e);
                }
            });
        });
    }
}

Error's text:

20.08.2023 11:58:01 api/Settings/ImportSignatures
Exception of type 'System.OutOfMemoryException' was thrown.
   at System.Threading.Tasks.Task.EnsureContingentPropertiesInitializedUnsafe()
   at System.Threading.Tasks.Task.AssignCancellationToken(CancellationToken cancellationToken, Task antecedent, TaskContinuation continuation)
   at System.Threading.Tasks.Task.TaskConstructorCore(Delegate action, Object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
   at Store.Services.SignatureImportService.<>c__DisplayClass20_0.<ImportSigningCerts>b__0(String path)
   at System.Collections.Generic.List`1.ForEach(Action`1 action)
   at Store.Services.SignatureImportService.ImportSigningCerts()
   at Store.Controllers.SettingsController.ImportSignatures()
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Dany
  • 37
  • 8
  • To start try to remove the _Directory.EnumerateFile(...).ToList()_ because this loads all files in memory. Instead of ToList leave the EnumerateFiles alone and use its IEnumerable return to feed the foreach loop – Steve Aug 23 '23 at 12:53
  • 1
    Mullti-Threading will make issue worse. I would add a USING block in the foreach so after each file is processed the object will be disposed and may solve issue. – jdweng Aug 23 '23 at 12:55
  • Also the line that takes the _Count_ makes the materialization of the IEnumerable – Steve Aug 23 '23 at 12:55
  • The exception call stack clearly indicates that "The common language runtime cannot allocate enough contiguous memory to successfully perform an operation", so read the Remarks section and follow the tips, https://learn.microsoft.com/en-us/dotnet/api/system.outofmemoryexception?view=net-7.0#remarks – Lex Li Aug 23 '23 at 14:50
  • 1
    This exception usually occurs when the memory usage of the application exceeds the available memory due to a large number of concurrently executing tasks. While multithreading can improve processing speed, too many concurrent threads can lead to memory exhaustion. Instead of starting tasks for all files at once, consider limiting the number of concurrent tasks. You can use SemaphoreSlim to control the maximum number of active tasks. – YurongDai Aug 24 '23 at 03:13
  • @Steve I need to get the number of files for the progress bar. Is there another way to get it without the materialization of the IEnumerable? – Dany Aug 24 '23 at 11:02
  • @jdweng If I use the `using` to work with the `Task`, I won't be able to wait for the completion of all tasks at the end, or do I not understand something? – Dany Aug 24 '23 at 11:04
  • @YurongDai I limit threads using `ThreadPool.SetMaxThreads`. Do you think it doesn't help? Or is it not about the one-time execution of threads, but about their one-time creation? – Dany Aug 24 '23 at 11:13
  • You have a lock so only one gets executed at a time. – jdweng Aug 24 '23 at 11:42
  • In the overall schema, the progress bar is probably not so much critical as the rest of your work. Replace it with a counter that increments at each file processed just to show to your users the program is not stuck but still working. They need to wait. – Steve Aug 24 '23 at 12:27
  • Have you considered using the [`Parallel.ForEach`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreach) method? – Theodor Zoulias Aug 28 '23 at 19:00
  • @TheodorZoulias, Thanks, I have already solved the problem – Dany Aug 28 '23 at 19:04
  • @Steve - `.Count` ***never*** materializes the enumerable. And `.Count()` only does so in limited situations. It wouldn't here if `.Count` we replaced with `.Count()`. – Enigmativity Aug 31 '23 at 22:38

1 Answers1

1

I have fixed the error System.OutOfMemoryException and I want to share my actions. Key points:

  • I refactored the code, methods ImportSignatures (from controller), ImportSigningCerts and UploadSigningCerts are now asynchronous;
  • Instead of using Task with array accumulation, I use Parallel.ForEachAsync with MaxDegreeOfParallelism = Environment.ProcessorCount * 3 limitation.

Now my code looks like this: Method ImportSignatures from controller:

[HttpPost("ImportSignatures")]
public async Task<JsonResult> ImportSignatures()
{
    try
    {
        ImportSigningCertsResult res = await SignatureImportService.ImportSigningCerts();
        return Json(res);
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
        return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
    }
}

Method ImportSigningCerts:

public static async Task<ImportSigningCertsResult> ImportSigningCerts()
{
    LogsHelper.WriteEventLog("Launching SignatureImportService");

    WasCancelled = false;
    IsWorking = true;
    ResultStr = "";
    signaturesCount = 0;
    processedSignatures = 0;

    var cancelMsg = "Certificate import was interrupted. \n";
    var endMsg = "Certificate import completed successfully. \n";

    try
    {
        var configuration = SignatureImportConfiguration.FromCfg();

        using (s_tokenSource = new CancellationTokenSource())
        {
            string[] signatures = Directory.GetFiles(configuration.Path, "*.sig");
            signaturesCount = signatures.Length;

            Store mainStore = StoreMan.GetStore("Main");
            ImportStats importStats = new();

            const int partSize = 100000; // After what number of processed signatures to perform intermediate storage and deletion of signatures

            var options = new ParallelOptions()
            {
                CancellationToken = s_tokenSource.Token,
                MaxDegreeOfParallelism = Environment.ProcessorCount * 3
            };

            try
            {   // Dividing the total array into parts containing partSize elements in order to perform intermediate saving and deleting of processed files
                foreach (string[] signsChunk in signatures.Chunk(partSize))
                {
                    List<string> toDelete = new();
                    try
                    {
                        await Parallel.ForEachAsync(signsChunk, options, async (signPath, _) =>
                        {
                            bool canBeRemoved = await UploadSigningCerts(mainStore, signPath, importStats);
                            if (canBeRemoved && configuration.NeedCleaning)
                            {
                                lock (s_toDeleteListLockObj)
                                    toDelete.Add(signPath);
                            }
                            Interlocked.Increment(ref processedSignatures);
                        });
                    }
                    finally
                    {
                        LogsHelper.WriteEventLog("Intermediate saving of the certificate store...");

                        mainStore.WriteIfChanged();
                        StartRemovingSignatures(toDelete);
                    }
                }
            }
            catch (OperationCanceledException) { }
            catch (Exception e)
            {
                LogsHelper.WriteLog("DeloWebSignatureImportService/ImportSigningCerts:Parallel.ForEachAsync", e);
            }

            ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
        }

        LogsHelper.WriteEventLog(ResultStr);
        return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
    }
    finally
    {
        IsWorking = false;
    }
}

Method UploadSigningCerts:

private static async Task<bool> UploadSigningCerts(Store store, string path, ImportStats importStats)
{
    CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;

    bool toBeDeleted = true;
    byte[] signatureData = await File.ReadAllBytesAsync(path);
    CertInfo[] certs = Array.Empty<CertInfo>();

    try
    {
        certs = client.GetSignCmsInfo(signatureData).Certs;

        Interlocked.Add(ref importStats.all, certs.Length);

        for (int i = 0; i < certs.Length; i++)
        {
            lock (s_importLockObj)
            {
                // Validating each certificate from a signature, making an import decision, importing to the store...
            }
        }
        return toBeDeleted;
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("SignatureImportService/UploadSigningCerts", e);
        LogsHelper.WriteEventLog($"Error importing certificate from signature: {Path.GetFileName(path)};");
        Interlocked.Increment(ref importStats.errors);
        return false;
    }
}

Everything else remained unchanged. Thanks to everyone who wrote comments, you guided me on the right path))

Dany
  • 37
  • 8
  • 1
    This looks like a verbose, inefficient, and potentially buggy attempt to reinvent the functionality that is already available in the [`Parallel`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel) class. – Theodor Zoulias Aug 28 '23 at 19:09
  • @TheodorZoulias, do you think so? I will study your suggestion, maybe you are right... I'll text you later – Dany Aug 28 '23 at 19:15
  • @TheodorZoulias, I implemented your approach using the `Parallel.ForEach`. So the code really looks neater, nothing superfluous. However, for some reason, when testing Parallel.ForEach with `MaxDegreeOfParallelism` set at 2x of the cores' number, it showed itself much slower - 11h of work versus 7h using `SemaphoreSlim` and `Task`s. I have set the limit now to 3x - it has become more or less similar to the "best" result - 8h. I think I'll leave it that way. In any case, thank you for your advice)) – Dany Aug 30 '23 at 09:20
  • Try calling `ThreadPool.SetMinThreads(options.MaxDegreeOfParallelism, Environment.ProcessorCount);` before starting the parallel loop. The `Parallel.ForEach` uses `ThreadPool` threads, so you will need plenty of them if your are doing blocking I/O. Alternatively you could switch to asynchronous I/O and the `Parallel.ForEachAsync` API, so that you can have high concurrency using only a few threads. You could also take a look at [this answer](https://stackoverflow.com/questions/9538452/what-does-maxdegreeofparallelism-do/75287075#75287075) of mine. – Theodor Zoulias Aug 30 '23 at 10:13
  • Btw I see that you are using the `Parallel.ForEach` inside a `for` loop. This is not the correct usage for this API. Also seeing the line `Interlocked.Increment(ref processedSignatures);` I have the impression that you are still trying to do manually work that the `Parallel.ForEach` is designed to do for you. Your code should be much simpler than it currently is. For batching the `signatures` you could use the [`Chunk`](https://learn.microsoft.com/en-us/dotnet/api/system.linq.enumerable.chunk) LINQ operator. – Theodor Zoulias Aug 30 '23 at 10:29
  • @TheodorZoulias, I don't need to use `ThreadPool.SetMinThreads` because in the process of executing the program, I observe that the maximum number of threads is involved, equal to the limit I set. If I don't set a limit, then I have an average of about 60 threads on a 4-core machine... Threads are involved to the maximum – Dany Aug 30 '23 at 12:16
  • @TheodorZoulias, why is using the `Parallel.ForEach` inside a `for` loop not correct? I found this method better than blocking threads inside `Parallel.ForEach` to track the number of processed files to save storage. According to the tests without the `for` loop, the program was executed for about 15 hours, with a `for` - of 7 hours... – Dany Aug 30 '23 at 12:27
  • @TheodorZoulias, I didn't understand why you don't like the line `Interlocked.Increment(ref processedSignatures);`. What other options are there for performing increment inside threads without blocking? – Dany Aug 30 '23 at 12:30
  • When a `Parallel.ForEach` operation is about to complete, the effective degree of parallelism is reduced. That's because the workload is rarely so perfectly balanced, to keep all workers running till the last millisecond. Placing the `Parallel.ForEach` in a loop means that you are going to pay this "penalty of imbalance" multiple times instead of just once. As for the `Interlocked.Increment`, it's not that I don't like it. You might have legitimate reasons to use it. I haven't studied your code deeply. I am just expressing my feeling, based on what looks superficially as red-flag of misuse. – Theodor Zoulias Aug 30 '23 at 13:02
  • If you are using the `Interlocked.Increment(ref processedSignatures);` for reporting progress, it's OK. If you are using it for controlling the execution flow, it's a red flag of doing something that you shouldn't. – Theodor Zoulias Aug 30 '23 at 13:14
  • 1
    @TheodorZoulias, `processedSignatures` used for reporting progress. Okay, I totally understand you. Thanks a lot :-) – Dany Aug 30 '23 at 14:14