0

I am trying to execute file upload using Parallel.ForEachAsync, it works but loses the sort order. Is there any method to synchronize sort order or source and destination lists?

await Parallel.ForEachAsync(model.DestinationFiles,
    new ParallelOptions { MaxDegreeOfParallelism = 20 }, async (file, CancellationToken) =>
    {
        var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
        convertResultDto.Files.Add(new ConverterConvertResultFile(storeAsync));
    });

Previously I used Linq parallel operator (PLINQ), which has the AsOrdered operator to deal with sorting. Anyway, I think the Parallel.ForEachAsync is better for using in async methods with I/O scenario?

var storeFiles = model.DestinationFiles.AsParallel().AsOrdered().WithDegreeOfParallelism(50)
    .Select(file => StoreAsync(file.FileInfo, false, file.OutputFileName).GetAwaiter().GetResult())
    .Select(storeFile => new StoreFile
    {
        FileId = storeFile.FileId,
        Url = storeFile.Url,
        OutputFileName = storeFile.OutputFileName,
        Size = storeFile.Size
    });
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
Tomas
  • 17,551
  • 43
  • 152
  • 257

1 Answers1

0

In this case, you're wanting to get a set of results and store them in a resulting collection. Parallel is designed for more operations without results. For operations with results, you can use PLINQ for CPU-bound operations or asynchronous concurrency for I/O-bound operations. Unfortunately, there isn't a PLINQ equivalent for Parallel.ForEachAsync, which would be the closest equivalent to your current code.

Asynchronous concurrency uses Task.WhenAll to get the results of multiple asynchronous operations. It can also use SemaphoreSlim for throttling. Something like this:

var mutex = new SemaphoreSlim(20);
var results = await Task.WhenAll(model.DestinationFiles.Select(async file =>
{
  await mutex.WaitAsync();
  try
  {
    var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
    return new ConverterConvertResultFile(storeAsync);
  }
  finally { mutex.Release(); }
});
convertResultDto.Files.AddRange(results);

However, if you have a mixture of CPU-bound and I/O-bound operations, then you'll probably want to continue to use ForEachAsync. In that case, you can create the entries in your destination collection first, then perform each operation with an index so it knows where to store them:

// This code assumes convertResultDto.Files is empty at this point.
var count = model.DestinationFiles.Count;
convertResultDto.Files.AddRange(Enumerable.Repeat<ConverterConvertResultFile>(null!, count));
await Parallel.ForEachAsync(
    model.DestinationFiles.Select((file, i) => (file, i)),
    new ParallelOptions { MaxDegreeOfParallelism = 20 },
    async item =>
    {
      var (file, i) = item;
      var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
      convertResultDto.Files[i] = new ConverterConvertResultFile(storeAsync);
    });
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 1
    The `SemaphoreSlim`+`Task.WhenAll` solution has the issue that it doesn't complete early in case of an exception. In case you have 1,000 items to process and the first item fails, the rest 999 items will still be processed, only for getting the same exception at the end. – Theodor Zoulias Jan 06 '23 at 16:01
  • Does `ForEachAsync` complete early in case of exception? – Stephen Cleary Jan 06 '23 at 16:37
  • Yep. It also cancels the `CancellationToken` that is passed to the `body`, for extra fast completion. – Theodor Zoulias Jan 06 '23 at 16:40