0

I want to upload potentially large batches (possibly 100s) of files to FTP, using the SSH.NET library and the Renci.SshNet.Async extensions. I need to limit the number of concurrent uploads to five, or whatever number I discover the FTP can handle.

This is my code before any limiting:

using (var sftp = new SftpClient(sftpHost, 22, sftpUser, sftpPass))
{
    var tasks = new List<Task>();
    try
    {
        sftp.Connect();

        foreach (var file in Directory.EnumerateFiles(localPath, "*.xml"))
        {
            tasks.Add(
                sftp.UploadAsync(
                    File.OpenRead(file),      // Stream input
                    Path.GetFileName(file),   // string path
                    true));                   // bool canOverride
        }

        await Task.WhenAll(tasks);
        sftp.Disconnect();
    }
    // trimmed catch
}

I've read about SemaphoreSlim, but I don't fully understand how it works and how it is used with TAP. This is, based on the MSDN documentation, how I would implement it.

I'm unsure if using Task.Run is the correct way to go about this, as it's I/O bound, and from what I know, Task.Run is for CPU-bound work and async/await for I/O-bound work. I also don't understand how these tasks enter (is that the correct terminology) the semaphore, as all they do is call .Release() on it.

using (var sftp = new SftpClient(sftpHost, 22, sftpUser, sftpPass))
{
    var tasks = new List<Task>();
    var semaphore = new SemaphoreSlim(5);
    try
    {
        sftp.Connect();

        foreach (var file in Directory.EnumerateFiles(localPath, "*.xml"))
        {
            tasks.Add(
                Task.Run(() =>
                {
                    sftp.UploadAsync(
                        File.OpenRead(file),      // Stream input
                        Path.GetFileName(file),   // string path
                        true));                   // bool canOverride
                    semaphore.Release();
                });
        }

        await Task.WhenAll(tasks);
        sftp.Disconnect();
    }
    // trimmed catch
}
CarenRose
  • 1,266
  • 1
  • 12
  • 24
  • Have you looked into ThreadPools? Specifically [ThreadPool.SetMaxThreads](https://learn.microsoft.com/en-us/dotnet/api/system.threading.threadpool.setmaxthreads?view=netframework-4.7.2#System_Threading_ThreadPool_SetMaxThreads_System_Int32_System_Int32_) – DotNetPadawan Mar 21 '19 at 19:55
  • @DotNetPadawan As is mentioned in the question, this is not CPU bound work. It won't be using the thread pool for the actual long running operation. – Servy Mar 21 '19 at 21:19
  • Possible duplicate of [Have a set of Tasks with only X running at a time](https://stackoverflow.com/questions/14075029/have-a-set-of-tasks-with-only-x-running-at-a-time) – Cory Nelson Mar 22 '19 at 15:30

1 Answers1

2

from what I know, Task.Run is for CPU-bound work

Correct.

and async/await for I/O-bound work.

No. await is a tool for adding continuations to an asynchronous operation. It doesn't care about the nature of what that asynchronous operation is. It simply makes it easier to compose asynchronous operations of any kind together.

If you want to compose several asyncrhonous operations together you do that by making an async method, using the various asynchronous operations, awaiting them when you need their results (or for them to be completed) and then use the Task form that method as its own new asynchronous operation.

In your case your new asynchronous operation simply needs to be awaiting the semaphore, uploading your file, then releasing the semaphore.

async Task UploadFile()
{
    await semaphore.WaitAsync();
    try
    {
        await sftp.UploadAsync(
            File.OpenRead(file),
            Path.GetFileName(file),
            true));   
    }
    finally
    {
        semaphore.Release();
    }
}

Now you can simply call that method for each file.

Additionally, because this is such a common operation to do, you may find it worth it to create a new class to handle this logic so that you can simply make a queue, and add items to the queue, and have it handle the throttling internally, rather than replicating that mechanic everywhere you use it.

Servy
  • 202,030
  • 26
  • 332
  • 449