0

I have a method in an AWS Lambda which receives a completed list of items. It processes those items and then inserts them into a Dynamo DB table. Order of processing or inserting into the DB does not matter.

I'm looking for the most thread-safe, yet easy to understand manner in which to make this run faster; Either by using async await or something other (likely parallel?) operations.

I was thinking Parallel.ForEach() but that seems a bit heavy. Is there a simpler, more obvious way?

private async Task<int> LoadAutocomplete(IList<Item> resp)
{
    var client = new AmazonDynamoDBClient();

    foreach (var item in resp)
    {
        var request = new PutItemRequest
        {
            TableName = EnvironmentHelper.DynamoTableName,
            Item = new Dictionary<string, AttributeValue>()
            {
                { "LANGUAGE", new AttributeValue { S = item.LANGUAGE }},
                { "COUNTRY", new AttributeValue { S = item.COUNTRY }}
            }
        };
        await client.PutItemAsync(request);

        System.Threading.Thread.Sleep(100);
    }
}

Option 1

private async Task<int> LoadAutocomplete(IList<Item> resp)
{
    var client = new AmazonDynamoDBClient();

    Parallel.ForEach(resp, async item =>
    {
        var request = new PutItemRequest
        {
            TableName = EnvironmentHelper.DynamoTableName,
            Item = new Dictionary<string, AttributeValue>()
            {
                { "LANGUAGE", new AttributeValue { S = item.LANGUAGE }},
                { "COUNTRY", new AttributeValue { S = item.COUNTRY }}
            }
        };
        await client.PutItemAsync(request);
    }
}

Option 2 causes the compiler to complain that the LoadAutoComplete method "lacks await operators and will run synchronously".

Option 2 As suggested by @jamesfaix

private async Task<int> LoadAutocomplete(IList<Item> resp)
{
    var client = new AmazonDynamoDBClient();
    
    var tasks = items.Select(x => DoSomethingAsync(client, x)).ToList();
    
    await Task.WhenAll(tasks);
}

private DoSomething(AmazonDynamoDBClient client, Item item)
{
    var request = new PutItemRequest
    {
        TableName = EnvironmentHelper.DynamoTableName,
        Item = new Dictionary<string, AttributeValue>()
        {
            { "LANGUAGE", new AttributeValue { S = item.LANGUAGE }},
            { "COUNTRY", new AttributeValue { S = item.COUNTRY }}
        }
    };
    
    await client.PutItemAsync(request);
}

Options + Suggested by @martin

The post "Use Parallel.For in batches in dotnet core" does respond to my question, but I've chosen the answer posted by @jamesfaix because it has improved my code so much.

Matt W
  • 11,753
  • 25
  • 118
  • 215
  • 4
    Don't use `Sleep` in async code. Use `await Task.Delay` – JamesFaix Jun 10 '21 at 20:11
  • Ah, yes. I've actually removed that line but forgot to here. I might be scuppered by it anyway, because I seem to remember needing that line to ensure the DB inserts perform properly. – Matt W Jun 10 '21 at 20:13
  • Another thing you can do is start all tasks before awaiting any of them. The runtime will try to optimize their execution based on available threads. For example ``` var tasks = items.Select(x => DoSomethingAsync(x)).ToList(); await Task.WhenAll(tasks); ``` – JamesFaix Jun 10 '21 at 20:13
  • Is it required to impose any concurrency or rate limitations to the whole operation? A concurrency limit denotes how many operations can be concurrently *in-flight* (the duration of the operations matters). A rate limit denotes how many operations can be *started* during any sliding time window (the duration of the operations is irrelevant). – Theodor Zoulias Jun 10 '21 at 20:21
  • Please define "clean" in objective terms. Otherwise, this is an exercise in listing all of the ways to do something. – Heretic Monkey Jun 10 '21 at 20:22
  • 2
    Between your Option 1 and Option 2, I would be more concerned with behavior difference than syntactic cleanness. – JamesFaix Jun 10 '21 at 20:24
  • @heretic-monkey I have refined that requirement in the post. You're right, "clean" is not a good term (but I was hoping that it would be broadly understood.) – Matt W Jun 10 '21 at 20:27
  • 3
    You can reject the "Option 1" without further ado. The `Parallel.ForEach` [is not async friendly](https://stackoverflow.com/questions/67915707/unexpected-different-results-using-task-whenall-and-parallel-foreach-with-an-ent/67923412#67923412). – Theodor Zoulias Jun 10 '21 at 20:27
  • @jamesfaix What do you mean by behaviour difference? AFAIK they would produce the same result, unless I'm missing something about the end product? – Matt W Jun 10 '21 at 20:28
  • @TheodorZoulias Yes, the compiler does not like it, either. – Matt W Jun 10 '21 at 20:28
  • @TheodorZoulias There is no requirement to impost concurrency or rate limitations. Am I implying that there is? If so, I have missed that. – Matt W Jun 10 '21 at 20:29
  • 1
    You mentioned in [a comment](https://stackoverflow.com/questions/67927835/cleanest-async-version-of-this-foreach#comment120063870_67927835) that "the DB inserts perform properly", by adding a 100 msec delay between operations. – Theodor Zoulias Jun 10 '21 at 20:34
  • @TheodorZoulias There may be due to performance of the client library. I will try it without. 700 records should be enough to test the concern. – Matt W Jun 10 '21 at 20:37
  • Does this answer your question? [Use Parallel.For in batches in dotnet core](https://stackoverflow.com/questions/67656840/use-parallel-for-in-batches-in-dotnet-core) before I copy my answer to this question it maybe helps you :) – Martin Jun 10 '21 at 22:29

1 Answers1

2

Here are some basic changes I would make to start off with. There may be other improvements you can make from there.

  1. Avoid Thread.Sleep in async code. Task.Delay is the async equivalent.
  2. Create many tasks before awaiting any. The runtime will try to run some concurrently if it can.
private async Task<int> LoadAutocomplete2(IList<Item> resp)
{
    var client = new AmazonDynamoDBClient();

    var tasks = resp.Select(async item =>
    {
        var request = new PutItemRequest
        {
            TableName = EnvironmentHelper.DynamoTableName,
            Item = new Dictionary<string, AttributeValue>()
            {
                { "LANGUAGE", new AttributeValue { S = item.LANGUAGE }},
                { "COUNTRY", new AttributeValue { S = item.COUNTRY }}
            }
        };

        var result = await client.PutItemAsync(request);
        await Task.Delay(100);
        return result;
    })
    .ToList(); // Make sure to materialize the IEnumerable!

    await Task.WhenAll(tasks);
}
JamesFaix
  • 8,050
  • 9
  • 37
  • 73
  • Thank you @jamesfaix - this has improved the execution time from 1:20 to 0:03 :) – Matt W Jun 11 '21 at 05:41
  • Why is it important to materialise the `tasks` IEnumerable? – Matt W Jun 11 '21 at 06:30
  • Is it because of the potential to accidentally enumerate the list multiple times during the `WhenAll` check would cause re-processing of items in the list? (This explanation is my derivation from this post: https://stackoverflow.com/a/60447378/71376) – Matt W Jun 11 '21 at 06:38
  • 2
    That was my thinking. Reading this again, it may be okay without `ToList` because `WhenAll` is the only thing iterating the collection, but I would be cautious if there are any other uses of the collection. It would be easy for a multiple iteration bug to sneak in. – JamesFaix Jun 11 '21 at 16:08