0

I've have a recursive descent function that takes all of the files in the parent directory and all of the files from any number of child directories to send to AWS S3. I have a 5 min timeout, based from this post, set to let all of the files in the folder get pushed to S3 and if it takes longer than that I want to cancel any of the remaining tasks. While I'm setting the cancel flag for the token regardless of if the Delay or Wait of the WhenAny hit the timeout or not I want to be able take all of the tasks that didn't complete from the list and pull the details of the request for logging. Microsoft says the Id and CurrentId of the task can't be considered unique.

How can I get the request object that created the task from the task object?

private static void ProcessDirectory(System.IO.DirectoryInfo di)
{
    int _timeOut = 5 * 60 * 1000;
    foreach (var item in di.GetDirectories())
    {
        ProcessDirectory(item);
    }

    using (Amazon.S3.AmazonS3Client _client = new Amazon.S3.AmazonS3Client())
    {
        System.Threading.CancellationTokenSource _cancellationTokenSource = new System.Threading.CancellationTokenSource();
        System.Collections.Generic.List<System.Threading.Tasks.Task<Amazon.S3.Model.PutObjectResponse>> _responses = new List<System.Threading.Tasks.Task<Amazon.S3.Model.PutObjectResponse>>(1000);
        foreach (var item in di.GetFiles())
        {
            _responses.Add(_client.PutObjectAsync(new Amazon.S3.Model.PutObjectRequest
            {
                BucketName = SiteSettings.Bucket,
                CannedACL = Amazon.S3.S3CannedACL.PublicRead,
                FilePath = item.FullName,
                Key = item.FullName.Replace(SiteSettings.OutputRoot, string.Empty).Replace(@"\", "/")
            }, _cancellationTokenSource.Token));                    
        }
        // Wait 5 Mins + 1 sec
        System.Threading.Tasks.Task.WhenAny(System.Threading.Tasks.Task<Amazon.S3.Model.PutObjectResponse>.WhenAll(_responses)
            , System.Threading.Tasks.Task.Delay(_timeOut)).Wait(_timeOut + 1000);

        _cancellationTokenSource.Cancel(); //Cancel the remaining pushes for this folder.
        foreach (var item in _responses)
        {
            if (!item.IsCompleted)
            {
                //Pull the key value to log
            }
        }
    }
}
Aaron
  • 511
  • 3
  • 25
  • Your code would be a lot simpler and faster if you changed `ProcessDirectory` into an asynchronous method and just awaited individual PUTs in the loop. Trying to execute eg 100 PUTs at the same time will results in 100 slow uploads. For large files a single upload at a time would be faster than the 100 concurrent uploads. Other options would be to use `Parallel.For` with a limit to how many concurrent tasks can run at a time, or an ActionBlock with a DOP>1 – Panagiotis Kanavos Sep 05 '18 at 15:36
  • @PanagiotisKanavos can you explain why threading one item would be faster than multiple items? It seems to me that would just create a thread for the sake of creating a thread and instead of running in the foreground it gets put into the background to complete when ever the cpu has time to get to it vs having the context and cpu ready to run the code block. – Aaron Sep 05 '18 at 20:25

1 Answers1

1

You can save some unique key for every work item once you create it and then use that key for logging. In this example I used item.FullName as a key. Also I have taken a liberty to remove long namespaces before types for better readability, hope you wouldn't mind:

private static void ProcessDirectory(System.IO.DirectoryInfo di)
{
    int _timeOut = 5 * 60 * 1000;
    foreach (var item in di.GetDirectories())
    {
        ProcessDirectory(item);
    }

    using (Amazon.S3.AmazonS3Client _client = new Amazon.S3.AmazonS3Client())
    {
        CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
        Dictionary<string, Task<Amazon.S3.Model.PutObjectResponse>> _responses =
            new Dictionary<string, Task<Amazon.S3.Model.PutObjectResponse>>(1000);

        foreach (var item in di.GetFiles())
        {
            // use any unique information about your item here
            var itemName = item.FullName;
            _responses[itemName] = _client.PutObjectAsync(new Amazon.S3.Model.PutObjectRequest
            {
                BucketName = SiteSettings.Bucket,
                CannedACL = Amazon.S3.S3CannedACL.PublicRead,
                FilePath = itemName,
                Key = item.FullName.Replace(SiteSettings.OutputRoot, string.Empty).Replace(@"\", "/")
            }, _cancellationTokenSource.Token);
        }
        // Wait 5 Mins + 1 sec
        Task.WhenAny(Task<Amazon.S3.Model.PutObjectResponse>.WhenAll(_responses.Values)
            ,Task.Delay(_timeOut)).Wait(_timeOut + 1000);

        _cancellationTokenSource.Cancel(); //Cancel the remaining pushes for this folder.
        foreach (var item in _responses)
        {
            if (!item.Value.IsCompleted)
            {
                //Pull the key value to log
                var keyValue = item.Key;
            }
        }
    }
}

You see, I exchanged List<Task<Amazon.S3.Model.PutObjectResponse>> with Dictionary<string, Task<Amazon.S3.Model.PutObjectResponse>> where key is a full name of the file. So if some task in a dictionary doesn't finish in 5 minute, you will be able to get the name of the file, that was not loaded.

Hope it helps.

KozhevnikovDmitry
  • 1,660
  • 12
  • 27