0

My media bag is getting populated inside of the foreach, but when it hits the bottom line the mediaBag is empty?

 var mediaBag = new ConcurrentBag<MediaDto>();

        Parallel.ForEach(mediaList,
            new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
            async media =>
            {
                var imgBytes = await this.blobStorageService.ReadMedia(media.BlobID, Enums.MediaType.Image);
                var fileContent = Convert.ToBase64String(imgBytes);

                var image = new MediaDto()
                {
                    ImageId = media.MediaID,
                    Title = media.Title,
                    Description = media.Description,
                    ImageContent = fileContent
                };

                mediaBag.Add(image);
            });

        return mediaBag.ToList();

Is this because of my blobstorage function not being thread safe? what would this mean and what is the soultion if that is the case.

Hawkzey
  • 1,088
  • 1
  • 11
  • 21
  • 2
    At first glance I did not see an overload of `Parallel.ForEach` that takes a `Func`, so I guess it is not awaiting your async lambda. So it's finished before any data has been written to the bag. – René Vogt Sep 12 '19 at 12:08
  • 7
    `Parallel.ForEach` doesn't work well with async actions: https://stackoverflow.com/questions/57675722/how-to-wait-for-the-result-while-controller-is-making-parallel-call/57676089#57676089 – mm8 Sep 12 '19 at 12:09
  • What is the *real* problem and why did you use `Parallel.ForEach`? The `Parallel` methods are meant for data parallelism. They use all available cores to partition and process a lot of data. They don't work *at all* with asynchronous methods. If you want to download blobs at a fixed DOP you can use eg an ActionBlock – Panagiotis Kanavos Sep 12 '19 at 12:35
  • Go to Project|Properties, Build tab and set "Treat Warnings as errors" to "All". – H H Sep 12 '19 at 13:13
  • @HenkHolterman that is not what i have done, i have simply misunderstood the meaning of parallelism. – Hawkzey Sep 20 '19 at 08:58

3 Answers3

0

Parallel.ForEach doesn't work well with async actions.

You could start and store the tasks returned by ReadMedia in an array and then wait for them all to complete using Task.WhenAll before you create the MediaDto objects in parallel. Something like this:

var mediaBag = new ConcurrentBag<MediaDto>();

Task<byte[]>[] tasks = mediaList.Select(media => blobStorageService.ReadMedia(media.BlobID, Enums.MediaType.Image)).ToArray();
await Task.WhenAll(tasks);
Parallel.ForEach(imgBytes, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
    bytes =>
    {
        var fileContent = Convert.ToBase64String(imgBytes);
        var image = new MediaDto()
        {
            ImageId = media.MediaID,
            Title = media.Title,
            Description = media.Description,
            ImageContent = fileContent
        };
        mediaBag.Add(image);
    });
return mediaBag.ToList();
mm8
  • 163,881
  • 10
  • 57
  • 88
0

Parallelism isn't concurrency. Parallel.ForEach is meant for data parallelism, not executing concurrent actions. It partitions the input data and uses as many worker tasks as there are cores to process one partition each. It doesn't work at all with asynchronous methods because that would defeat its very purpose.

What you ask for is concurrent operations - eg downloading 100 files, 4 or 6 at a time. One way would be to just launch all 100 tasks and wait for them to finish. That's a bit extreme and will probably flood the network connection.

A better way to do this would be to use a TPL Dataflow block like TransformBlock with a specific DOP, eg :

var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 };
var buffer=new BufferBlock<MediaDto>();
var block=new TransformBlock<ThatMedia,MediaDto>(media =>{
    var imgBytes = await this.blobStorageService.ReadMedia(media.BlobID, Enums.MediaType.Image);
    var fileContent = Convert.ToBase64String(imgBytes);

    var image = new MediaDto()
            {
                ImageId = media.MediaID,
                Title = media.Title,
                Description = media.Description,
                ImageContent = fileContent
            };
    return image;
},options);
block.LinkTo(buffer);

After that, you can start posting entries to the block.

foreach(var entry in mediaList)
{
    block.Post(entry);
}
block.Complete();
await block.Completion;
if(buffer.TryReceiveAll(out var theNewList))
{
    ...
}
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
-1

Thanks for the advice, i believe i may of misunderstood a 'Parallel.ForEach' usecase.

i have modified the function to use a list of tasks instead and it works very nicely. Below is the changes i made.

var mediaBag = new ConcurrentBag<MediaDto>();

        IEnumerable<Task> mediaTasks = mediaList.Select(async m =>
        {
            var imgBytes = await this.blobStorageService.ReadMedia(m.BlobID, Enums.MediaType.Image);
            var fileContent = Convert.ToBase64String(imgBytes);

            var image = new MediaDto()
            {
                ImageId = m.MediaID,
                Title = m.Title,
                Description = m.Description,
                ImageContent = fileContent
            };

            mediaBag.Add(image);
        });

        await Task.WhenAll(mediaTasks);

        return mediaBag.ToList();
Hawkzey
  • 1,088
  • 1
  • 11
  • 21