2

I have code that grabs images from my mongodb database and then uploads it to my azure storage as seen below:

public async Task UploadAssetsAsync(Func<GridFSFileInfo, string> prefixSelector, List<GridFSFileInfo> files, Func<GridFSFileInfo, Task<Stream>> streamOpener, Func<string, Task> progressAction)
        {
            if (flyersContainerClient == null)
                throw new Exception("Container client not initialized. Please initialize before doing blob operations.");
            var q = new Queue<Task<Response<BlobContentInfo>>>();
            progressAction?.Invoke($"{files.Count}");
            foreach (var f in files)
            {
                var pathPrefix = prefixSelector(f);
                var blobClient = flyersContainerClient.GetBlobClient($"{pathPrefix}/{f.Filename.Replace("_copy", "")}");
                IDictionary<string, string> metadata = new Dictionary<string, string>();
                var blobhttpheader = new BlobHttpHeaders();
                if (f.Filename.EndsWith("svg"))
                {

                    blobhttpheader.ContentType = "image/svg+xml";
                }
                var stream = await streamOpener(f);
                if (pathPrefix == "thumbnails")
                {
                    var format = ImageFormat.Jpeg;
                    Bitmap cropped = null;
                    using (Image image = Image.FromStream(stream))
                    {
                        format = image.RawFormat;
                        Rectangle rect = new Rectangle(0, 0, image.Width, (image.Width * 3) / 4);
                        cropped = new Bitmap(image.Width, (image.Width * 3) / 4);
                        using (Graphics g = Graphics.FromImage(cropped))
                        {
                            g.DrawImage(image, new Rectangle(0, 0, cropped.Width, cropped.Height), rect, GraphicsUnit.Pixel);
                        }
                    }
                    stream.Dispose();                    
                    stream = new MemoryStream();                                     
                    cropped.Save(stream, format);
                    stream.Position = 0;
                }
                //await blobClient.SetMetadataAsync(metadata);
                q.Enqueue(blobClient.UploadAsync(stream, new BlobUploadOptions { HttpHeaders = blobhttpheader, TransferOptions = new Azure.Storage.StorageTransferOptions { MaximumConcurrency = 8, InitialTransferSize = 50 * 1024 * 1024 } }));
                //await blobClient.SetHttpHeadersAsync(blobHttpHeader);
            }
            await Task.WhenAll(q);
        }        

You can see what it is doing when the foreach starts with my list of files. I am also using an asyncronous task at the end that waits for all of my q variables to finish as seen in the WhenAll at the bottom. Would it benefit my code to use a Parallel.Foreach for uploading my files or is there a faster way to achieve what I am doing?

Thanks for the help!

Adil15
  • 405
  • 1
  • 5
  • 15
  • 1
    There's too much code trying to reinvent existing features. Parallel.ForEach is only meant for in-memory data parallelism. Use `Parallel.ForEachAsync` to execute concurrent asynchronous operations with a fixed DOP. Progress reporting is provided by the `IProgress` interface and `Progress<>` class. Use Channels instead of `Queue>>` if you want an asynchronous pub/sub. The code is doing too many things - processing images and uploading them. It's far simpler to split this into two separate methods – Panagiotis Kanavos Nov 17 '22 at 15:27
  • You can use the TPL Dataflow classes to construct a pipeline of steps what process the files and upload them concurrently, similar to how a shell pipeline would work. A Dataflow block has its own asyn buffer so all you need to do is pass the processing delegate/lambda – Panagiotis Kanavos Nov 17 '22 at 15:28
  • Thanks for your response @PanagiotisKanavos is there any documentation I should look at specifically? Or a small code snippet of how I should be dividing my code? – Adil15 Nov 17 '22 at 15:31
  • I added an example. No matter what you choose, extract the crawling, resizing, uploading code in separate methods. Each operation is complex enough that it's better to split them, especially if you want to execute them concurrently – Panagiotis Kanavos Nov 17 '22 at 15:54
  • Are you targeting the .NET 6 or later? – Theodor Zoulias Nov 17 '22 at 15:59
  • 1
    @TheodorZoulias Targeting .Net 5 – Adil15 Nov 17 '22 at 16:03
  • On .NET 5 you don't have the option to use the [`Parallel.ForEachAsync`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync) API. There are many custom implementations of this method, that can be used in the pre-.NET 6 era. [Here](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach/65251949#65251949) is a couple of them. – Theodor Zoulias Nov 17 '22 at 16:08
  • Parallel.ForEachAsync can be used instead of Parallel.ForEach but is *not* the best choice here. Resizing and uploading have different concurrency requirements. `Parallel.ForEachAsync` doesn't handle buffering between steps, and this question's code looks like an intermediary step between a file crawler and something that processes the upload responses – Panagiotis Kanavos Nov 17 '22 at 16:08

1 Answers1

1

Parallel.ForEach is meant for data parallelism - processing a lot of in-memory data by using all cores at 100%. That's not what's going on here.

The most appropriate classes would be the TPL Dataflow's TransformBlock and ActionBlock. Each DataFlow block processes messages in its input queue using one or more worker tasks and forwards them to the next block in the pipeline.

This also allows using separate methods to crawl files, process images and upload them. The pipeline code could be something like this:

record ResizedImage(FileInfo File,MemoryStream Stream);

...
var options=new ExecutionDataflowBlockOptions {
    MaxDegree 
var crawler=new TransformManyBlock<string,FileInfo>(CrawlFolder);
var resizer=new TransformBlock<FileInfo,ResizedImage>(ResizeImage,
    new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism=2,
        BoundedCapacity=4,
    });
var uploader=new TransformBlock<ResizedImage,Response<BlobContentInfo>>(UploadImage,
    new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism=2,
        BoundedCapacity=4
    });
...

var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};
crawler.LinkTo(resizer,linkOptions);
resizer.LinkTo(uploader,linkOptions);
...

Each method only needs to accept an input message and return the output. Buffering and concurrency are handled by the Dataflow blocks. In this case 1 task is used to crawl, 2 tasks are used to resize images and 2 to upload. The DOP can be adjusted to achieve optimal performance.

The BoundedCapacity limits the input buffer of each block. This prevents a slow block from flooding RAM with waiting messages. When a block's input buffer is full, upstream blocks have to wait before they can process and send new messages.

Once the pipeline is constructed, the application can start posting data to the head block, in this case the crawler :

while (var folder in folders)
{
    crawler.Post(folder);
}

Messages will flow from one block to the next. When the head block completes it will also signal downstream blocks to complete. The application can wait for the entire pipeline to finish by awaiting the final block's Completion property

crawler.Complete();
await tail.Completion;

Now that the worker methods don't have to deal with concurrency, they become a lot simpler :

IEnumerable<FileInfo> CrawlFolder(string folder)
{
    var di=new DirectoryInfo(folder);
    return di.EnumerateFiles("*.*");
}

ResizedImage ResizeImage(FileInfo file)
{
    var format = ImageFormat.Jpeg;
    using var stream=file.Open();
    using (Image image = Image.FromStream(stream))
    {
        var format = image.RawFormat;
        Rectangle rect = new Rectangle(0, 0, image.Width, (image.Width * 3) / 4);
        using var cropped = new Bitmap(image.Width, (image.Width * 3) / 4);
        using (Graphics g = Graphics.FromImage(cropped))
        {
            g.DrawImage(image, new Rectangle(0, 0, cropped.Width, cropped.Height), rect, GraphicsUnit.Pixel);
        }

        var newStream = new MemoryStream();                                     
        cropped.Save(newStream, format);
        newStream.Position = 0;
        return new ResizedImage(file,newStream);
    }
}


async Task<Response<BlobContentInfo>> UploadImage(ResizedImage rsImage)
{
...
}
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • Also notice how these tools use **queues** to manage the "parallelism" in a controlled manner. The strategy that you contemplate would simply "hurl requests at" the remote interface, and this would very quickly overload it. The process would "thrash," and wind up running **much slower** than it otherwise would. Panagotis' analysis and recommendation is very sound. – Mike Robinson Nov 17 '22 at 16:03
  • @Adil15 I agree with Panagiotis Kanavos that the TPL Dataflow is a more sophisticated tool for this job, but personally I have lost a big chunk of my faith for this library after discovering that it is [subtly broken](https://stackoverflow.com/questions/21603428/tpl-dataflow-exception-in-transform-block-with-bounded-capacity/69209696#69209696), with not much hope that it will be fixed any time soon. The [reported issue](https://github.com/dotnet/runtime/issues/52348) has not even been labeled as "bug" yet. – Theodor Zoulias Nov 17 '22 at 16:19
  • A demonstration of the broken behavior is [here](https://dotnetfiddle.net/ZzWiEl). The first block (`block1`) of this simple pipeline never completes. – Theodor Zoulias Nov 17 '22 at 16:39
  • If it was broken someone would have gotten a 50Kg robotic arm on the head - Dataflow evolved from the Microsoft Robotics Framework. That's simply not how CSP is meant to work. In all CSP and messaging systems links are oneway and errors *don't* flow upstream. Nor is a block supposed to allow exceptions to escape. If something like that happened eg in RabbitMQ, the broker would terminate the connection. – Panagiotis Kanavos Nov 17 '22 at 16:42
  • Those are edge cases that depends on unusual conditions and may or may not get resolved when time becomes available. They don't affect the applications that do use CSP, dataflow or pipelines though. – Panagiotis Kanavos Nov 17 '22 at 16:44
  • @PanagiotisKanavos at this point ```var options=new ExecutionDataflowBlockOptions { MaxDegree``` did you mean to write ```MaxDegreeOfParallelism=2```? – Adil15 Nov 17 '22 at 16:45
  • Panagiotis Kanavos these are not preventable edge cases. These are non-deterministic behaviors that can happen under conditions that are outside of your control. The pipeline may work correctly 10,000 times, and the 10,001st time the timing of an exogenous exception might happen at exactly the "right" moment, resulting in a deadlock. Personally I am not comfortable with this, and I think that you wouldn't be comfortable either if this thing was controlling a 50Kg robotic arm in your laboratory. – Theodor Zoulias Nov 17 '22 at 16:52
  • @PanagiotisKanavos the first part of the code shared can you add a little more detail I am struggling to figure out how to write it. Thanks! – Adil15 Nov 17 '22 at 17:24
  • Btw I just became aware of another Dataflow bug, which is 3 years old and still unsolved: [Data loss during parallelization of BufferBlock](https://github.com/dotnet/runtime/issues/31513). – Theodor Zoulias Nov 24 '22 at 17:26