6

I am processing PDFs of vastly varying sizes (simple 2MB to high DPI scans of a few hundred MB) via a Parallel.ForEach and am occasionally getting to an OutOfMemoryException - understandably due to the process being 32 bit and the threads spawned by the Parallel.ForEach taking up an unknown amount of memory consuming work.

Restricting MaxDegreeOfParallelism does work, though the throughput for the times when there is a large (10k+) batch of small PDFs to work with is not sufficient as there could be more threads working due to the small memory footprint of said threads. This is a CPU heavy process with Parallel.ForEach easily reaching 100% CPU before hitting the occasional group of large PDFs and getting an OutOfMemoryException. Running the Performance Profiler backs this up.

From my understanding, having a partitioner for my Parallel.ForEach won't improve my performance.

This leads me to using a custom TaskScheduler passed to my Parallel.ForEach with a MemoryFailPoint check. Searching around it seems there is scarce information on creating custom TaskScheduler objects.

Looking between Specialized Task Schedulers in .NET 4 Parallel Extensions Extras, A custom TaskScheduler in C# and various answers here on Stackoverflow, I've created my own TaskScheduler and have my QueueTask method as such:

protected override void QueueTask(Task task)
{
    lock (tasks) tasks.AddLast(task);
    try
    {
        using (MemoryFailPoint memFailPoint = new MemoryFailPoint(600))
        {
            if (runningOrQueuedCount < maxDegreeOfParallelism)
            {
                runningOrQueuedCount++;
                RunTasks();
            }
        }
    }
    catch (InsufficientMemoryException e)
    {     
        // somehow return thread to pool?           
        Console.WriteLine("InsufficientMemoryException");
    }
}

While the try/catch is a little expensive my goal here is to catch when the probable maximum size PDF (+ a little extra memory overhead) of 600MB will throw an OutOfMemoryException. This solution through seems to kill off the thread attempting to do the work when I catch the InsufficientMemoryException. With enough large PDFs my code ends up being a single thread Parallel.ForEach.

Other questions found on Stackoverflow on Parallel.ForEach and OutOfMemoryExceptions don't appear to suit my use case of maximum throughput with dynamic memory usage on threads and often just leverage MaxDegreeOfParallelism as a static solution, E.g.:

So to have maximum throughput for variable working memory sizes, either:

  • How do I return a thread back into the threadpool when it has been denied work via the MemoryFailPoint check?
  • How/where do I safely spawn new threads to pick up work again when there is free memory?

Edit: The PDF size on disk may not linearly represent size in memory due to the rasterization and rasterized image manipulation component which is dependent on the PDF content.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Tem
  • 131
  • 1
  • 6

2 Answers2

1

Using LimitedConcurrencyLevelTaskScheduler from Samples for Parallel Programming with the .NET Framework I was able to make a minor adjustment to get something that looked about what I wanted. The following is the NotifyThreadPoolOfPendingWork method of the LimitedConcurrencyLevelTaskScheduler class after modification:

private void NotifyThreadPoolOfPendingWork()
{
    ThreadPool.UnsafeQueueUserWorkItem(_ =>
    {
        // Note that the current thread is now processing work items.
        // This is necessary to enable inlining of tasks into this thread.
        _currentThreadIsProcessingItems = true;
        try
        {
            // Process all available items in the queue.
            while (true)
            {
                Task item;
                lock (_tasks)
                {
                    // When there are no more items to be processed,
                    // note that we're done processing, and get out.
                    if (_tasks.Count == 0)
                    {
                        --_delegatesQueuedOrRunning;
                        break;
                    }

                    // Get the next item from the queue
                    item = _tasks.First.Value;
                    _tasks.RemoveFirst();
                }

                // Execute the task we pulled out of the queue
                //base.TryExecuteTask(item);

                try
                {
                    using (MemoryFailPoint memFailPoint = new MemoryFailPoint(650))
                    {
                        base.TryExecuteTask(item);
                    }
                }
                catch (InsufficientMemoryException e)
                {
                    Thread.Sleep(500);

                    lock (_tasks)
                    {
                        _tasks.AddLast(item);
                    }
                }

            }
        }
        // We're done processing items on the current thread
        finally { _currentThreadIsProcessingItems = false; }
    }, null);
}

We'll look at the catch, but in reverse. We add the task we were going to work on back to the list of tasks (_tasks) which triggers an event to get an available thread to pick up that work. But we sleep the current thread first in order for it to not pick up the work straight way and go back into a failed MemoryFailPoint check.

Tem
  • 131
  • 1
  • 6
0

The idea of a memory-aware TaskScheduler that is based on the MemoryFailPoint class is pretty neat. Here is another idea. You could limit the parallelism based on the known size of each PDF file, by using a SemaphoreSlim. Before processing a file you could acquire the semaphore as many times as the size of the file in megabytes, and after the processing is completed you could release the semaphore an equal number of times.

The tricky part is that the SemaphoreSlim doesn't have an API that acquires it atomically more than once, and acquiring it multiple times non-atomically in parallel could result easily in a deadlock. One way to synchronize the acquisition of the semaphore could be to use as an asynchronous mutex a second new SemaphoreSlim(1, 1). Another way is to move the acquisition one step back, at the enumeration phase of the source sequence. The implementation below demonstrates the second approach. It is a variant of the .NET 6 API Parallel.ForEachAsync, that on top of the existing features it is equiped with two additional parameters sizeSelector and maxConcurrentSize:

public static Task ParallelForEachAsync_LimitedBySize<TSource>(
    IEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask> body,
    Func<TSource, int> sizeSelector,
    int maxConcurrentSize)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(parallelOptions);
    ArgumentNullException.ThrowIfNull(body);
    ArgumentNullException.ThrowIfNull(sizeSelector);
    if (maxConcurrentSize < 1)
        throw new ArgumentOutOfRangeException(nameof(maxConcurrentSize));

    SemaphoreSlim semaphore = new(maxConcurrentSize, maxConcurrentSize);

    async IAsyncEnumerable<(TSource, int)> Iterator()
    {
        foreach (TSource item in source)
        {
            int size = sizeSelector(item);
            size = Math.Clamp(size, 0, maxConcurrentSize);
            for (int i = 0; i < size; i++)
                await semaphore.WaitAsync().ConfigureAwait(false);
            yield return (item, size);
        }
    }

    return Parallel.ForEachAsync(Iterator(), parallelOptions, async (entry, ct) =>
    {
        (TSource item, int size) = entry;
        try { await body(item, ct).ConfigureAwait(false); }
        finally { if (size > 0) semaphore.Release(size); }
    });
}

Internally it calls the Parallel.ForEachAsync overload that has a source of type IAsyncEnumerable<T>.

Usage example:

ParallelOptions options = new() { MaxDegreeOfParallelism = 10 };
await ParallelForEachAsync_LimitedBySize(paths, options, async (path, ct) =>
{
    // Process the file
    await Task.CompletedTask;
}, sizeSelector: path =>
{
    // Return the size of the file in MB
    return (int)(new FileInfo(path).Length / 1_000_000);
}, maxConcurrentSize: 2_000);

The paths will be processed with a maximum parallelism of 10, and a maximum concurrent size of 2 GB.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Caution: Converting the above implementation to use the synchronous `Parallel.ForEach` instead of the asynchronous `Parallel.ForEachAsync` is not trivial. The default chunk partitioning of the `Parallel.ForEach` could cause a deadlock. The partitioner should be configured with the `EnumerablePartitionerOptions.NoBuffering` option. – Theodor Zoulias Feb 07 '23 at 01:31