-3

When the Parallel.ForEach loop iterates over a large pickedFiles collection with a big number of files ~300 than my application fail with System.OutOfMemoryException. When the files number is small everything is ok. This error happens only in x86 compile mode, in x64 compile mode application won't fall.

Total size of files is 55Gb. Some file few kb large others can be up to 20 GB.

How to fix this error? I really need to use Parallel.ForEach because the speed is much faster than in an ordinary loop, but I can't figure out why this error occurs. The application memory usage doesn't exceed 900 MB in x-86 debug mode when I analyze in performance profiler.

Here is my UWP solution 1drv.ms/u/s!AmDz4cAaOTuzgzAsuSt3P6v9ouCF?e=qVkJeI

public sealed partial class MainPage : Page
    {

        FileOpenPicker picker { get; set; }
        IReadOnlyList<StorageFile> pickedFiles { get; set; }
        const int chunkSize = 10000000;

        public MainPage()
        {
            this.InitializeComponent();

        }

        async public void F1()
        {

            ParallelOptions parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = 4 };
            Parallel.ForEach(pickedFiles, parallelOptions,async(file) =>
            {

                StorageFile createdFile = await createEmptyFileAsync(file);

                using (IRandomAccessStream stream = await file.OpenAsync(FileAccessMode.Read))
                {
                    ulong remainingSize = stream.Size;
                    ulong startPosition = 0;

                    while (remainingSize > 0)
                    {
                        using (DataReader reader = new DataReader(stream.GetInputStreamAt(startPosition)))
                        {
                            if (chunkSize >= remainingSize)
                            {
                                byte[] buffer = new byte[remainingSize];

                                await reader.LoadAsync((uint)remainingSize);
                                reader.ReadBytes(buffer);

                                await writeTo(createdFile, buffer);

                                remainingSize = 0;
                            }
                            else
                            {
                                byte[] buffer = new byte[chunkSize];

                                await reader.LoadAsync(chunkSize);
                                reader.ReadBytes(buffer);

                                await writeTo(createdFile, buffer);

                                startPosition += chunkSize;
                                remainingSize -= chunkSize;
                            }

                        }
                    }

                }
            });
            {

            }

        }


        async void GetFile(object sender, RoutedEventArgs e)
        {
            picker = new FileOpenPicker();
            picker.SuggestedStartLocation = PickerLocationId.ComputerFolder;
            picker.FileTypeFilter.Add("*");

            pickedFiles = await picker.PickMultipleFilesAsync();
            F1();
        }

        async Task writeTo(StorageFile file, byte[] write)
        {
            using (Stream x = await file.OpenStreamForWriteAsync())
            {
                x.Seek(0, SeekOrigin.End);
                await x.WriteAsync(write, 0, write.Length);
            }

        }

        async public Task<StorageFile> createEmptyFileAsync(StorageFile file)
        {
            StorageFolder knownFolders = KnownFolders.PicturesLibrary;
            StorageFile createdFile = await knownFolders.CreateFileAsync(file.Name, CreationCollisionOption.GenerateUniqueName);
            return createdFile;

        }
    }

The function evaluation was disabled because of an out of memory exception.

System.OutOfMemoryException at System.StubHelpers.MngdHiddenLengthArrayMarshaler.ConvertSpaceToNative(IntPtr pMarshalState, Object& pManagedHome, IntPtr pNativeHome) at Windows.Storage.Streams.DataReader.ReadBytes(Byte[] value) at TaskProblem.MainPage.<b__10_0>d.MoveNext() in C:\Users\Admin\source\repos\TaskProblem\TaskProblem\MainPage.xaml.cs:line 65

netGuru
  • 47
  • 5
  • 1
    `I can't figure out why this error occurs` https://stackoverflow.com/questions/8951836/why-large-object-heap-and-why-do-we-care – mjwills Sep 02 '19 at 10:39
  • 1
    The `Parallel` class is meant for data parallelism, which isn't the case here. It doesn't even support asynchronous methods. The question's code calls `async void` methods without waiting for any of them to complete. You're dealing with a lot of small files, not one large data frame – Panagiotis Kanavos Sep 02 '19 at 10:40
  • 1
    @Christopher the most common cause for an OOM is memory fragmentation. Even if the arrays are collected, the memory gets so fragmented that after a while the runtime won't be able to allocate a big-enough memory chunk. – Panagiotis Kanavos Sep 02 '19 at 10:43
  • @mjwills Does it work if you set the MaxDegreesOfParallelism to 4? NO. – netGuru Sep 02 '19 at 10:44
  • @netGuru what does your code *actually* do? What does `write` do and why is `Parallel.ForEach` used? There are better ways to do whatever it is you're trying to do. `Parallel.ForEach` is definitely *wrong*. If you need to copy the contents from multiple files you could use a simple `ActionBlock` with a *smallish* buffer. You could reuse that buffer or use eg the `ArrayPool` class to create reusable buffers – Panagiotis Kanavos Sep 02 '19 at 10:47
  • @Panagiotis Kanavos, my code reads the input files in chunks then write the chunks to other files. – netGuru Sep 02 '19 at 10:51
  • 1
    @Christopher on the contrary, the most common cause for OOM questions in SO is fragmentation. The GC will throw an OOM if it can't find a large enough memory block, not only when there's not enough memory. Besides, the OP's code is using *async void*, which means it's allocating 10MB buffers for *all* files at once – Panagiotis Kanavos Sep 02 '19 at 10:52
  • 2
    @Christopher you'll find a a **lot** of questions about Lists that throw OOMs even in x64 systems. In any case though, the OP's code is allocating thousands of arrays at once. The GC won't have a chance to run before the memory runs out. In fact, I wonder what that code does even in x64 systems as it *doesn't* wait for any file to be copied! – Panagiotis Kanavos Sep 02 '19 at 10:57
  • Have you considered using https://learn.microsoft.com/en-us/uwp/api/windows.storage.storagefile.copyasync instead? – mjwills Sep 02 '19 at 11:12
  • mjwills@ - I need to encrypt the bytes after I write them down to a new file. I do that in chunks because if a file is too large I still will fave OOM. – netGuru Sep 02 '19 at 11:19
  • In that case, we'll need to see a [mcve]. – mjwills Sep 02 '19 at 11:21
  • @netGuru encrypt how? The [CryptoStream](https://learn.microsoft.com/en-us/dotnet/api/system.security.cryptography.cryptostream?view=netframework-4.8) class can wrap another one and encrypt the data on the fly. Once you have a `CryptoStream` you can use `Stream.CopyTo` to copy data from any other stream to it – Panagiotis Kanavos Sep 02 '19 at 13:52

3 Answers3

2

Parallel methods are meant for data parallelism and don't support asynchronous methods. The lambda async (file) =>{...} passed to Parallel.ForEach is equivalent to an async void Something(StorageFile file) method. This means that the lambda returns as soon as the first await is encountered and never awaited. For eg 10000 files, this code will fire off 10000 fire-and-forget tasks that are going to allocate 10MB buffers in loops. Even for small files, that's a lot of wasted RAM.

Worse, Parallel.ForEach will return immediately after firing off all those tasks and never wait for all of them to finish. If the user closes the application too soon some files will be incomplete.

This is a concurrency, not a parallelism problem. The aim is to process multiple files concurrently, with a controlled DOP (Degree of Parallelism).

One way to do that is to use the TPL Dataflow library's ActionBlock>T class :

var options = new ExecutionDataflowBlockOptions
{
     MaxDegreeOfParallelism = 10
};

var block=new ActionBlock<StorageFile>(myCopyMethod,options);

foreach(var file in pickedFiles)
{
    block.Post(file);
}

block.Complete();
await block.Completion;

The actual copying is performed by a separate method with the correct signature :

async Task myCopyMethod(StorageFile file)
{
    var createdFile = await createEmptyFileAsync(file);
    ...
}

This code will perform up to 10 concurrent copy operations, no matter how many files are posted to the block.

Copying data is primarily an IO-bound operation which means the DOP can be bigger than the number of cores. On the other hand, in a desktop application one may want to restrict the number of cores so the system remains responsive during long operations.

The code can be improved in several ways. One of them is to reuse those large buffers among files instead of allocating new ones. An easy way to do this is to use the ArrayPool class to handle "renting" and returning big arrays to a pool :

ArrayPool<byte> _myBigPool=ArrayPool.Create<byte>(10000000,20);

The copy method should use that pool instead of creating new arrays, eg:

var  buffer = _myBigPool.Rent(chunkSize);
await reader.LoadAsync(chunkSize);
reader.ReadBytes(buffer);
await writeTo(createdFile, buffer);
_myBigPool.Release(buffer);

The recommended way of using an ArrayPool is to use the Shared pool instance instead of creating a new one. That pool creates buffers whose maximum size is 1MB. Using 10MB chunks is probably too much anyway, as the program have to wait for 10MB of data to be loaded before it can write them back to a different file :

const int chunkSize=1048576;

...

var samePool = ArrayPool<byte>.Shared;

var  buffer = samePool.Rent(chunkSize);
await reader.LoadAsync(chunkSize);
reader.ReadBytes(buffer);
await writeTo(createdFile, buffer);
samePool.Release(buffer);
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
0

The Memory Limit of a x32 .NET application is by default only 2 GiB. Switches can increase that to 3 GiB and full 4 GiB. There is also a size limit for single objects (2 GiB), but I doubt it maters here. You somehow managed to reach that limit. That usually only happens if you keep something in memory, that you should have let free for the GC to collect. Or if you try to load a large file in one go (taht is one reason Enumerators are a thing).

Your mistake is propably that you are using paralellisation on a Disk or Network Bound operation. Multitasking in general and Threading in particular only helps with CPU bound operations. I like to say: "Multitasking has to pick it's problems carefully." If you do not pick the right problemm, you end up with code that is more complex, more memory demanding and most importanlty slower then the sequential counterpart. A minimum amount of Multitasking is nesssesary, but usually only "move the long running operation into a single, alternate task". With the long running operation being itterating over the file list.

Per file paralelsiation just spams the Disk with read opeartions that NCQ has to sort out again and runs teh dangar of keeping the data for all files in memory at the same time. One file after the other is just more efficient. This is not a problem massive paralelisation can solve.

Christopher
  • 9,634
  • 2
  • 17
  • 31
0

I believe your main problem is combining Parallel.ForEach with async/await. The two do not work together as discussed here : Nesting await in Parallel.ForEach

In your scenario, even with MaxDegreeOfParallelism set, what is going to happen is that possibly all of the files will be opened at the same time. This will cause lots of memory to be allocated at the same time, which GC cannot collect as it is still being used.

Euphoric
  • 12,645
  • 1
  • 30
  • 44