0

My input is a long list of files located on an Amazon S3 server. I'd like to download the metadata of the files, compute the hashes of the local files, and compare the metadata hash with the local files' hash.

Currently, I use a loop to start all the metadata downloads asynchronously, then as each completes, compute MD5 on the local file if needed and compare. Here's the code (just the relevant lines):

Dim s3client As New AmazonS3Client(KeyId.Text, keySecret.Text)
Dim responseTasks As New List(Of System.Tuple(Of ListViewItem, Task(Of GetObjectMetadataResponse)))
For Each lvi As ListViewItem In lvStatus.Items
    Dim gomr As New Amazon.S3.Model.GetObjectMetadataRequest
    gomr.BucketName = S3FileDialog.GetBucketName(lvi.SubItems(2).Text)
    gomr.Key = S3FileDialog.GetPrefix(lvi.SubItems(2).Text)
    responseTasks.Add(New System.Tuple(Of ListViewItem, Task(Of GetObjectMetadataResponse))(lvi, s3client.GetObjectMetadataAsync(gomr)))
Next
For Each t As System.Tuple(Of ListViewItem, Task(Of GetObjectMetadataResponse)) In responseTasks
    Dim response As GetObjectMetadataResponse = Await t.Item2
    If response.ETag.Trim(""""c) = MD5CalcFile(lvi.SubItems(1).Text) Then
        lvi.SubItems(3).Text = "Match"
        UpdateLvi(lvi)
    End If
Next

I've got two problems:

  1. I'm awaiting the reponses in the order that I made them. I'd rather process them in the order that they complete so that I get them faster.

  2. The MD5 calculation is long and synchronous. I tried making it async but the process locked up. I think that the MD5 task was added to the end of .Net's task list and it didn't get to run until all the downloads completed.

Ideally, I process the response as they arrive, not in order, and the MD5 is asynchronous but gets a chance to run.

Edit:

Incorporating WhenAll, it looks like this now:

Dim s3client As New Amazon.S3.AmazonS3Client(KeyId.Text, keySecret.Text)
Dim responseTasks As New Dictionary(Of Task(Of GetObjectMetadataResponse), ListViewItem)
    For Each lvi As ListViewItem In lvStatus.Items
        Dim gomr As New Amazon.S3.Model.GetObjectMetadataRequest
        gomr.BucketName = S3FileDialog.GetBucketName(lvi.SubItems(2).Text)
        gomr.Key = S3FileDialog.GetPrefix(lvi.SubItems(2).Text)
        responseTasks.Add(s3client.GetObjectMetadataAsync(gomr), lvi)
    Next
    Dim startTime As DateTimeOffset = DateTimeOffset.Now
    Do While responseTasks.Count > 0
        Dim currentTask As Task(Of GetObjectMetadataResponse) = Await Task.WhenAny(responseTasks.Keys)
        Dim response As GetObjectMetadataResponse = Await currentTask
        If response.ETag.Trim(""""c) = MD5CalcFile(lvi.SubItems(1).Text) Then
            lvi.SubItems(3).Text = "Match"
            UpdateLvi(lvi)
        End If
    Loop
    MsgBox((DateTimeOffset.Now - startTime).ToString)

The UI locks up momentarily whenever MDSCalcFile is done. The whole loop takes about 45s and the first file's MD5 result happens within 1s of starting.

If I change the line to:

        If response.ETag.Trim(""""c) = Await Task.Run(Function () MD5CalcFile(lvi.SubItems(1).Text)) Then

The UI doesn't lock up when MD5CalcFile is done. The whole loop takes about 75s, up from 45s, and the first file's MD5 result happens after 40s of waiting.

Edit2:

I found a solution that works for me. The problem was in my GetObjectMetadataAsync. I wrote it wrong. The correct version with the wrong one in comments is below:

<System.Runtime.CompilerServices.Extension>
Function GetObjectMetadataAsync(a As AmazonS3Client, l As GetObjectMetadataRequest) As Task(Of GetObjectMetadataResponse)
    Return Task.Factory.FromAsync(AddressOf a.BeginGetObjectMetadata, AddressOf a.EndGetObjectMetadata, l, Nothing)
    'Return Task.Run(Function()
    '                    Try
    '                        Return a.GetObjectMetadata(l)
    '                    Catch ex As Amazon.S3.AmazonS3Exception
    '                        If ex.ErrorCode = "NoSuchKey" Then
    '                            Return Nothing
    '                        Else
    '                            Throw ex
    '                        End If
    '                    End Try
    '                End Function)
End Function

I don't know why it should matter if I put a synchronous version into a thread or use FromAsync, but clearly the latter is better looking and the testing shows it to be much faster.

cuongle
  • 74,024
  • 28
  • 151
  • 206
Eyal
  • 5,728
  • 7
  • 43
  • 70

2 Answers2

7

You can use WhenAny to process task results as they complete:

while (responseTasks.Length > 0)
{
  var completedTask = await Task.WhenAny(responseTasks);
  responseTasks.Remove(completedTask);
  var response = await completedTask;
  ...
}

(sorry for the C#; it's been way too long for my VB syntax to be correct).

For a complete discussion of the topic, see Stephen Toub's post on the subject.

Another option is TPL Dataflow, which allows you to build a "mesh" for the data to go through. For this example, Dataflow might be overkill, but it's useful if your real-world processing is more complex.

As far as the MD5 goes, making it asynchronous shouldn't be a problem. Tasks based on asynchronous I/O (such as the ones returned by GetObjectMetadataAsync) do not consume thread pool threads. I'd try a few other scenarios (like running MD5 asynchronously by itself) and then post another question if nothing obvious turns up.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • The solution to #1 works great, but I'm still having trouble with MD5. To do MD5 asyncronously, I wrote "Await Task.Run(Function () MD5(...))" where it was synchronous before. It runs correctly but the first MD5 doesn't complete for a long time, maybe because the ordering of the Tasks? – Eyal Sep 14 '12 at 19:42
  • Are you sure it's the MD5 operation that you're measuring? Maybe the first *task* (download) doesn't complete for a long time? – Stephen Cleary Sep 14 '12 at 19:44
  • I switched to your while(responseTasks.Length>0) suggestion and record Date.Now() before the start of the While loop and after exiting the loop. With synchronous MD5, MD5 starts immediately on the UI context and the total time is 45s. With async MD5, total time is 75s. Using breakpoints, I can see that the first MD5 doesn't start for a long time in the async version, maybe due to task ordering? – Eyal Sep 14 '12 at 19:54
  • The code sample is an O(n^2) algorithm. The linked post contains a better solution. – usr Sep 14 '12 at 20:11
  • @usr: I don't believe that that is the issue. The list is just 1000 items. The issue is that the first MD5 doesn't start for 20-30 seconds. – Eyal Sep 14 '12 at 20:14
  • I'm not quite following it. Are you saying the first MD5 (when run synchronously) actually begins executing immediately? Before the first download is complete? – Stephen Cleary Sep 14 '12 at 20:21
  • Every call to `Task.WhenAny` will add up to 1000 continuations. And you are calling `Task.WhenAny` 1000 times. That's O(n^2). Stephen Toubs post says the same. – usr Sep 14 '12 at 20:30
  • Stephen, I'm saying that the first MD5 begins as soon as the first download completes when I do MD5 synchronously and it finishes quickly. When I do it asynchronously, first call to MD5 happens quickly but the result isn't ready for about 40s. I think that scheduler is doing a FIFO and the MD5 thread isn't getting run until the queue has more room. – Eyal Sep 14 '12 at 21:00
  • usr, the slow down occurs after the first call to WhenAny. Even if I exit the loop after just a single call to WhenAny, it's still 40s compared to 1s. – Eyal Sep 14 '12 at 21:01
  • @Eyal: If you start an MD5 via `Task.Run`, it will be immediately queued to the thread pool. It does not wait for the download tasks to complete (which don't even run on the thread pool). – Stephen Cleary Sep 14 '12 at 21:36
  • I found a solution but it was outside of the functions that I wrote. I still don't see why FromAsync should be better than putting the synchronous version in a Task.Run() wrapper but the testing shows it to perform better in every way. – Eyal Sep 14 '12 at 21:39
0

Maybe using Task.ContinueWith would help instead of using await. It could also help make the code a little more straightforward than using await inside of these loops.

Note:

Obviously I don't have the same datatypes available to me that you do, so I worked up my example using basic data types and debug statements and then tried to retrofit your code example into it. You may need to tweak the syntax slightly.

Edit:

I updated my example to include passing the synchronization context to ContinueWith to ensure the callback happens on the UI thread (assuming this calling code is happening on the UI thread).

Reference: Task continuation on UI thread

...

Dim s3client As New AmazonS3Client(KeyId.Text, keySecret.Text)

For Each lvi As ListViewItem In lvStatus.Items

    Dim currentListItem = lvi

    Dim gomr As New Amazon.S3.Model.GetObjectMetadataRequest
    gomr.BucketName = S3FileDialog.GetBucketName(lvi.SubItems(2).Text)
    gomr.Key = S3FileDialog.GetPrefix(lvi.SubItems(2).Text)

    ' Pass this context to the ContinueWith method so that the callback is executed on the UI thread '
    Dim context = TaskScheduler.FromCurrentSynchronizationContext()
    Dim t = s3client.GetObjectMetadataAsync(gomr)
    t.ContinueWith(Sub(task) OnDownloadComplete(task.Result, currentListItem), context)
    t.Start()

Next

...

Private Sub OnDownloadComplete(response As GetObjectMetadataResponse, item As ListViewItem)
    If response.ETag.Trim(""""c) = MD5CalcFile(item.SubItems(1).Text) Then
        item.SubItems(3).Text = "Match"
        UpdateLvi(item)
    End If
End Sub
Community
  • 1
  • 1
mclark1129
  • 7,532
  • 5
  • 48
  • 84
  • I thought about that but ContinueWith runs in a new context and I can't update the item from that new context beacuse it isn't the UI thread. I can use BeginInvoke but it just gets messsier. – Eyal Sep 14 '12 at 21:30
  • I believe you could pass `TaskScheduler.FromCurrentSynchronizationContext()` into an overload for `ContinueWith` to keep it on the UI thread. – mclark1129 Sep 14 '12 at 21:35