Parallel.ForEach
and PLINQ
are meant for data parallelism - processing big chunks of data using multiple cores. It's meant for scenarios where you have eg 1GB of data in memory (or a very fast IEnumerable source) and want to process it using all cores. In such scenarios, you need to partition the data into independent chunks and have one worker crunch one crunch at a time, to limit the synchronization overhead.
What you describe though is concurrent uploads for a large number of files. That's pure IO, not data parallelism. Most of the time will be spent loading the data from disk or writing it to the network. This is a job for Task.Run
and async/await
. To upload multiple files concurrently, you could use an ActionBlock or a Channel to queue the files and upload them asynchronously. With channels you have to write a bit of worker boilerplate but you get greater control, especially in cases where you want to use eg the same client instance for multiple calls. An ActionBlock is essentially stateless.
Finally, you describe queues with different DOP based on size, which is a very nice idea when you have both big and small files. You can do that by using multiple ActionBlock instances, each with a different DOP, or multiple Channel workers, each with a different DOP.
Dataflows
Let's say you already have a method that uploads a file by path name :
//Adopted from the Google SDK example
async Task UploadFile(DriveService service,FileInfo file)
{
var fileName=Path.GetFileName(filePath);
using var uploadStream = file.OpenRead();
var request insertRequest = service.Files.Insert(
new File { Title = file.Name },
uploadStream,
"image/jpeg");
await insert.UploadAsync();
}
You can create three different ActionBlock instances, each with a different DOP :
var small=new ActionBlock<FileInfo>(
file=>UploadFile(service,file),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 15
});
var medium=new ActionBlock<FileInfo>(
file=>UploadFile(service,file),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
});
var big=new ActionBlock<FileInfo>(
path=>UploadFile(service,file),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2
});
And post different files to different blocks based on size :
var directory=new DirectoryInfo(...);
var files=directory.EnumerateFiles(...);
foreach(var file in files)
{
switch (file.Length)
{
case int x when x < 1024:
small.Post(file);
break;
case int x when x < 10240:
medium.Post(file);
break;
default:
big.Post(file);
break;
}
}
Or, in C# 8 :
foreach(var file in files)
{
var block = file.Length switch {
long x when x < 1024 => small,
long x when x < 10240=> medium,
_ => big
};
block.Post(file)
}
When iteration completes, we need to tell the blocks we are done by calling Complete()
on each one and waiting for all of them to finish with :
small.Complete();
medium.Complete();
big.Complete();
await Task.WhenAll(small.Completion, medium.Completion, big.Completion);