2

Think the title I've given is a bit confusing but hard to express what I'm trying to ask.

Basically I am writing a program in C# using .NET that uses the Google cloud API in order to upload data.

I am trying to do this in an efficient way and have used parallel.foreach with success but I need finer control. I collect the files to be uploaded into one list, which I want to sort by file size and then split into say 3 equally sized (in terms of gigabytes not file count) lists.

One of these lists will contain say a third in terms of total upload size but be comprised of the largest files (in gigabytes) but therefore the smallest count of files, the next list will be the same total gigabytes as the first list but be comprised of a greater number of smaller files and finally the last list will be comprised of many many small files but should also total the same size as the other sub lists.

I then want to assign a set number of threads to the upload process. (e.g. I want the largest file list to have 5 threads assigned, the middle to have 3 and the small file list to have only 2 thread.) Is it possible to set up these 3 lists to be iterated over in parallel, while controlling how many threads are allocated?

What is the best method to do so?

LJLRD
  • 23
  • 4
  • Split into those 3 lists, then run Parallel.ForEach on each, specifying a different concurrency limit for them using the MaxDegreeOfParallellism setting. See https://stackoverflow.com/questions/9290498/how-can-i-limit-parallel-foreach for more information. – Lasse V. Karlsen Feb 28 '20 at 10:52
  • `Parallel.ForEach` is meant for data parallelism - crunching a lot of *local* data by partitioning it into as many batches as cores, and using a separate worker task per pertition. It looks like what you do is *concurrent* processing of multiple files, a competely different thing. And `files to be uploaded` that's asynchronous processing, what `async/await` do. Again, a different thing. – Panagiotis Kanavos Feb 28 '20 at 10:53
  • 1
    I'd suggest using DataFlow blocks like [ActionBlock](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1?view=netcore-3.1) and [TransformBlock](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.transformblock-2?view=netcore-3.1) to break processing into a pipeline of concrete steps that can have their own DOP, their own code as needed. For IO tasks, you can use `async` worker methods and use the DOP setting to control the number of concurrent uploads. – Panagiotis Kanavos Feb 28 '20 at 10:57
  • For data crunching, you can set the DOP to eg 1, and use `PLINQ` or `Parallel.ForEach` in the worker to crunch the data (with its own DOP limit, otherwise the *other* blocks won't be able to run). Or you may be able to break the data crunching task into separate operations too. – Panagiotis Kanavos Feb 28 '20 at 10:58
  • 1
    I also have a bunch of relatively big files to download, parse and import into a database. I do that with a block that downloads the files asynchronously, a second block that parses them *synchronously* with a limited DOP, and a final block that imports them into the database using SqlBulkCopy. This way I can be downloading, parsing and importing at the same time. – Panagiotis Kanavos Feb 28 '20 at 11:00
  • In your case, you could use different pipelines for different types of files and send files to different blocks/pipelines based on some criteria. – Panagiotis Kanavos Feb 28 '20 at 11:02
  • I agree with @PanagiotisKanavos - It sounds like a job for DataFlow. – Matthew Watson Feb 28 '20 at 11:04
  • What is the reason that you want to split your workload in three lists, and process each list with a specific degree of parallelism? Do you think that by doing so you will process your workload faster, or is this a hard requirement imposed to you by your service provider (the Google cloud API)? – Theodor Zoulias Feb 28 '20 at 13:49
  • The idea is to have fewer threads handling files that upload quick to get them out of the way, while having many threads handling the upload of larger files. So while 5 large 4Gb files are uploading on 5 threads then at the same time 1 thread can be powering through 4Gb worth of little 50 Mb files for instance. I could be wrong if I’m honest. – LJLRD Feb 29 '20 at 19:57
  • My instinct is that the limiting factor of your workload is the upstream bandwidth of your internet connection. In case I am right then there shouldn't be any difference between any uploading strategy you may employ. Even uploading the files sequentially one after the other should be as fast (or even faster) as the most sophisticated partitioning mechanism that anyone could devise. – Theodor Zoulias Feb 29 '20 at 22:04

2 Answers2

2

Parallel.ForEach and PLINQ are meant for data parallelism - processing big chunks of data using multiple cores. It's meant for scenarios where you have eg 1GB of data in memory (or a very fast IEnumerable source) and want to process it using all cores. In such scenarios, you need to partition the data into independent chunks and have one worker crunch one crunch at a time, to limit the synchronization overhead.

What you describe though is concurrent uploads for a large number of files. That's pure IO, not data parallelism. Most of the time will be spent loading the data from disk or writing it to the network. This is a job for Task.Run and async/await. To upload multiple files concurrently, you could use an ActionBlock or a Channel to queue the files and upload them asynchronously. With channels you have to write a bit of worker boilerplate but you get greater control, especially in cases where you want to use eg the same client instance for multiple calls. An ActionBlock is essentially stateless.

Finally, you describe queues with different DOP based on size, which is a very nice idea when you have both big and small files. You can do that by using multiple ActionBlock instances, each with a different DOP, or multiple Channel workers, each with a different DOP.

Dataflows

Let's say you already have a method that uploads a file by path name :

//Adopted from the Google SDK example
async Task UploadFile(DriveService service,FileInfo file)
{
    var fileName=Path.GetFileName(filePath);

    using var uploadStream = file.OpenRead();
    var request insertRequest = service.Files.Insert(
        new File { Title = file.Name },
        uploadStream,
        "image/jpeg");

    await insert.UploadAsync();
}

You can create three different ActionBlock instances, each with a different DOP :

var small=new ActionBlock<FileInfo>(
                  file=>UploadFile(service,file),
                  new ExecutionDataflowBlockOptions
                  {
                      MaxDegreeOfParallelism = 15
                  });
var medium=new ActionBlock<FileInfo>(
                  file=>UploadFile(service,file),
                  new ExecutionDataflowBlockOptions
                  {
                      MaxDegreeOfParallelism = 10
                  });

var big=new ActionBlock<FileInfo>(
                  path=>UploadFile(service,file),
                  new ExecutionDataflowBlockOptions
                  {
                      MaxDegreeOfParallelism = 2
                  });

And post different files to different blocks based on size :

var directory=new DirectoryInfo(...);
var files=directory.EnumerateFiles(...);
foreach(var file in files)
{
    switch (file.Length)
    {
        case int x when x < 1024:
          small.Post(file);
          break;
        case int x when x < 10240:
          medium.Post(file);
          break;
        default:
          big.Post(file);
          break;
    }    
}

Or, in C# 8 :

foreach(var file in files)
{

  var block = file.Length switch {
                long x when x < 1024 => small,
                long x when x < 10240=> medium,
                _                    => big
  };
  block.Post(file)
}

When iteration completes, we need to tell the blocks we are done by calling Complete() on each one and waiting for all of them to finish with :

small.Complete();
medium.Complete();
big.Complete();

await Task.WhenAll(small.Completion, medium.Completion, big.Completion);
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • Also by awaiting the completion of all blocks with `Task.WhenAll`, in case of an exception to one of them you will delay the propagation of the exception until the completion of the other ones. A more sophisticated solution would include a `CancellationTokenSounce` that would cancel all blocks in case of an exception to one of them. – Theodor Zoulias Feb 28 '20 at 14:10
  • Question: is there a theoretical justification about why is it a good idea to split a workload by size in multiple queues with different DOP per queue? Will you achieve faster overall processing this way? Why so? – Theodor Zoulias Feb 28 '20 at 14:15
  • Brilliant. I will try to get this implemented tonight and run some tests! Thanks a lot. – LJLRD Feb 29 '20 at 19:58
  • Question: Once all the larger files are complete I would ideally want to reallocate the threads so that the max amount of threads are then allocated to the smaller files. That is to say that I want the maximum num of threads running all the time. How would you do this? What is the best way to queue up the uploads too, just take the list of all files and run a foreach? – LJLRD Mar 02 '20 at 11:29
  • @LJLRD great question, which boils down to "can I modify the execution parameters at runtime" and "How do I know category X has finished"? How would you know the "big" files have finished? – Panagiotis Kanavos Mar 03 '20 at 09:14
  • @LJLRD I was thinking about more complex solutions but a *very* simple solution would be to simply change the target in the switch statement and send smaller files to the other blocks. – Panagiotis Kanavos Mar 04 '20 at 07:49
0

Here is another idea. You could have a single list, but upload the files with a dynamic degree of parallelism. This would be easy to implement if the SemaphoreSlim class had a WaitAsync method that could reduce the CurrentCount by a value other than 1. You could then initialize the SemaphoreSlim with a large initialCount like 1000, and then call WaitAsync with a value relative to the size of each file. Lets call this value weight. The semaphore would guarantee that the sum weight of all files currently uploaded would not exceed 1000. This could be a single huge file with weight of 1000, or 10 medium files each weighing 100, or a mix of small, medium and large files with total weight around 1000. The degree of parallelism would constantly change depending on the weight of the next file in the list.

This is an example of the code that you'd have to write:

var semaphore = new SemaphoreSlim(1000);
var tasks = Directory.GetFiles(@"D:\FilesToUpload")
    .Select(async filePath =>
    {
        var fi = new FileInfo(filePath);
        var weight = (int)Math.Min(1000, fi.Length / 1_000_000);
        await semaphore.WaitAsync(weight); // Imaginary overload that accepts weight
        try
        {
            await cloudService.UploadFile(filePath);
        }
        finally
        {
            semaphore.Release(weight);
        }
    })
    .ToArray();
await Task.WhenAll(tasks);

Below is a custom AsyncSemaphorePlus class that provides the missing overload. It is based on Stephen Toub's AsyncSemaphore class from the blog post Building Async Coordination Primitives, Part 5: AsyncSemaphore. It is slightly modernized with features like Task.CompletedTask and TaskCreationOptions.RunContinuationsAsynchronously, that were not available at the time the blog post was written.

public class AsyncSemaphorePlus
{
    private readonly object _locker = new object();
    private readonly Queue<(TaskCompletionSource<bool>, int)> _queue
        = new Queue<(TaskCompletionSource<bool>, int)>();
    private int _currentCount;

    public int CurrentCount { get { lock (_locker) return _currentCount; } }

    public AsyncSemaphorePlus(int initialCount)
    {
        if (initialCount < 0)
            throw new ArgumentOutOfRangeException(nameof(initialCount));
        _currentCount = initialCount;
    }

    public Task WaitAsync(int count)
    {
        lock (_locker)
        {
            if (_currentCount - count >= 0)
            {
                _currentCount -= count;
                return Task.CompletedTask;
            }
            else
            {
                var tcs = new TaskCompletionSource<bool>(
                    TaskCreationOptions.RunContinuationsAsynchronously);
                _queue.Enqueue((tcs, count));
                return tcs.Task;
            }
        }
    }

    public void Release(int count)
    {
        lock (_locker)
        {
            _currentCount += count;
            while (_queue.Count > 0)
            {
                var (tcs, weight) = _queue.Peek();
                if (weight > _currentCount) break;
                (tcs, weight) = _queue.Dequeue();
                _currentCount -= weight;
                tcs.SetResult(true);
            }
        }
    }

}

Update: This approach is intended for uploading a medium/large number of files. It is not suitable for extremely huge number of files, because all uploading tasks are created upfront. If the files that have to be uploaded are, say, 100,000,000, then the memory required to store the state of all these tasks may exceed the available RAM of the machine. For uploading that many files the solution proposed by Panagiotis Kanavos is probably preferable, because in can be easily modified with bounded dataflow blocks, and by feeding them with SendAsync instead of Post, so that the memory required for the whole operation is kept under control.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • If you want to limit the number of tasks, the correct way is a custom TaskScheduler to control how many tasks run at the same time. There's no need to add this kind of throttling after the fact. – Panagiotis Kanavos Mar 03 '20 at 08:20
  • As for a single semaphore pool, the problem is starvation. It's the same problem faced by OS thread scheduling. Big jobs can take over the thread, never letting small jobs run. The end result is both unfair and inefficient. That's why OS thread schedulers have ways of boosting thread priorities for small jobs, or use different priority queues, like the OP tries to do – Panagiotis Kanavos Mar 03 '20 at 08:25
  • The `TaskScheduler` [can't be used](https://stackoverflow.com/questions/15428604/how-to-run-a-task-on-a-custom-taskscheduler-using-await/57702536#57702536) for throttling tasks created by `async` methods. It can only be used to throttle CPU-bound tasks (from the pre-async era). – Theodor Zoulias Mar 03 '20 at 08:26
  • Why? A task isn't CPU bound, it's just a promise. `async/await` doesn't change how the TPL works, only how awaiting the promise works. You can use a custom task scheduler *before* starting the async calls (just like your code does). In fact, you'll find that the custom task scheduler examples in the docs are quite similar to your code – Panagiotis Kanavos Mar 03 '20 at 08:30
  • About the problem of starvation: the `AsyncSemaphorePlus` grands access to the semaphore using a FIFO queue. The jobs will run in the same order they called the `WaitAsync` method. – Theodor Zoulias Mar 03 '20 at 08:31
  • What's why it's prone to starvation and unfairness – Panagiotis Kanavos Mar 03 '20 at 08:31
  • @PanagiotisKanavos I challenge you to make a custom `TaskScheduler` that can throttle tasks created by async methods. [I have tried and failed](https://stackoverflow.com/questions/15428604/how-to-run-a-task-on-a-custom-taskscheduler-using-await/57702536#57702536). Show me how it's done! – Theodor Zoulias Mar 03 '20 at 08:34
  • There is no unfairness with a FIFO queue. Unfair is when a plethora of small jobs prevents indefinitely a big job from ever running. – Theodor Zoulias Mar 03 '20 at 08:39
  • @PanagiotisKanavos try configuring an async dataflow block with both `MaxDegreeOfParallelism` and `TaskScheduler`. Can you guess who is going to win? The [`ExclusiveScheduler`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.concurrentexclusiveschedulerpair.exclusivescheduler) or the parallelism? You can see the answer [here](https://dotnetfiddle.net/MbrNDw). – Theodor Zoulias Mar 04 '20 at 00:09
  • I don't know enough to comment on this but I appreciate both of your responses and ideas on the topic haha – LJLRD Mar 04 '20 at 10:52