Here is a TPL Dataflow approach. A BufferBlock<FtpClient>
is used as a pool of FtpClient
objects. The recursive enumeration takes a parameter of type IEnumerable<string>
that holds the segments of one filepath. These segments are combined differently when constructing the local and the remote filepath. As a side effect of invoking the recursive enumeration, the paths of the remote files are sent to an ActionBlock<IEnumerable<string>>
. This block handles the parallel downloading of the files. Its Completion
property contains eventually all the exceptions that may have occurred during the whole operation.
public static Task FtpDownloadDeep(string ftpHost, string ftpRoot,
string targetDirectory, string username = null, string password = null,
int maximumConnections = 1)
{
// Arguments validation omitted
if (!Directory.Exists(targetDirectory))
throw new DirectoryNotFoundException(targetDirectory);
var fsLocker = new object();
var ftpClientPool = new BufferBlock<FtpClient>();
async Task<TResult> UsingFtpAsync<TResult>(Func<FtpClient, Task<TResult>> action)
{
var client = await ftpClientPool.ReceiveAsync();
try { return await action(client); }
finally { ftpClientPool.Post(client); } // Return to the pool
}
var downloader = new ActionBlock<IEnumerable<string>>(async path =>
{
var remotePath = String.Join("/", path);
var localPath = Path.Combine(path.Prepend(targetDirectory).ToArray());
var localDir = Path.GetDirectoryName(localPath);
lock (fsLocker) Directory.CreateDirectory(localDir);
var status = await UsingFtpAsync(client =>
client.DownloadFileAsync(localPath, remotePath));
if (status == FtpStatus.Failed) throw new InvalidOperationException(
$"Download of '{remotePath}' failed.");
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = maximumConnections,
BoundedCapacity = maximumConnections,
});
async Task Recurse(IEnumerable<string> path)
{
if (downloader.Completion.IsCompleted) return; // The downloader has failed
var listing = await UsingFtpAsync(client =>
client.GetListingAsync(String.Join("/", path)));
foreach (var item in listing)
{
if (item.Type == FtpFileSystemObjectType.Directory)
{
if (item.Size != 0) await Recurse(path.Append(item.Name));
}
else if (item.Type == FtpFileSystemObjectType.File)
{
var accepted = await downloader.SendAsync(path.Append(item.Name));
if (!accepted) break; // The downloader has failed
}
}
}
// Move on to the thread pool, to avoid ConfigureAwait(false) everywhere
return Task.Run(async () =>
{
// Fill the FtpClient pool
for (int i = 0; i < maximumConnections; i++)
{
var client = new FtpClient(ftpHost);
if (username != null && password != null)
client.Credentials = new NetworkCredential(username, password);
ftpClientPool.Post(client);
}
try
{
// Enumerate the files to download
await Recurse(new[] { ftpRoot });
downloader.Complete();
}
catch (Exception ex) { ((IDataflowBlock)downloader).Fault(ex); }
try
{
// Await the downloader to complete
await downloader.Completion;
}
catch (OperationCanceledException)
when (downloader.Completion.IsCanceled) { throw; }
catch { downloader.Completion.Wait(); } // Propagate AggregateException
finally
{
// Clean up
if (ftpClientPool.TryReceiveAll(out var clients))
foreach (var client in clients) client.Dispose();
}
});
}
Usage example:
await FtpDownloadDeep("ftp://ftp.test.com", "", @"C:\FtpTest",
"username", "password", maximumConnections: 10);
Note: The above implementation enumerates the remote directory lazily, following the tempo of the downloading process. If you prefer to enumerate it eagerly, gathering all info available about the remote listings ASAP, just remove the BoundedCapacity = maximumConnections
configuration from the ActionBlock
that downloads the files. Be aware that doing so could result in high memory consumption, in case the remote directory has a deep hierarchy of subfolders, containing cumulatively a huge number of small files.