1

Based on some questions on SO, mainly this one: Throttling asynchronous tasks

I have implemented the SemaphoreSlim object to concurrently processes requests over a range of methods in my application. Most of these methods are taking in lists of IDs and getting single byte arrays back per ID in a concurrent fashion from the web. The implementation looks like this:

 using (var semaphore = new SemaphoreSlim(MaxConcurrency))
                {
                    var tasks = fileMetadata.GroupBy(x => x.StorageType).Select(async storageTypeFileMetadata=>
                    {
                        await semaphore.WaitAsync();
                        try
                        {
                            var fileManager = FileManagerFactory.CreateFileManager((StorageType)storageTypeFileMetadata.Key);
                            await fileManager.UpdateFilesAsync(storageTypeFileMetadata);
                        }
                        finally
                        {
                            semaphore.Release();
                        }
                    });

                    await Task.WhenAll(tasks);
                }

Is there a way to abstract out a method or some reusable code snippet for the semaphore code, and pass in the work I need done, so it can be reused without re-writing the semaphore code each time? The only difference amongst multiple methods using this same semaphore pattern is the list I am iterating and the work it is doing in the try{}.

I am thinking something like pass list.select(x=> my task method with my work in it) to a semaphore method which is all the wrapper semaphore code.

mameesh
  • 3,651
  • 9
  • 37
  • 47
  • So, in order to save a few keystrokes at develop-time, you're willing do delegate instantiation and invocation at run-time. Is that it? – Paulo Morgado Oct 06 '17 at 22:11
  • 1
    Main reason is first out of curiously, and second to possible not have 20+ methods wrapped in the exact same code which makes readability far more difficult since the main functionality of the method is 2 lines. – mameesh Oct 07 '17 at 18:17

1 Answers1

2

So I'm guessing something like:

public static class Extension
{
    public static async Task ExecuteAsync<T>(this IEnumerable<T> items, Func<T, Task> task, int concurrency)
    {
        var tasks = new List<Task>();

        using (var semaphore = new SemaphoreSlim(concurrency))
        {
            foreach (var item in items)
            {
                tasks.Add(ExecuteInSemaphore(semaphore, task, item));
            }

            await Task.WhenAll(tasks);
        }
    }

    private static async Task ExecuteInSemaphore<T>(SemaphoreSlim semaphore, Func<T, Task> task, T item)
    {
        await semaphore.WaitAsync();

        try
        {
            await task(item);
        }
        finally
        {
            semaphore.Release();
        }
    }
}

Then you would use it like:

await fileMetadata.GroupBy(x => x.StorageType).ExecuteAsync(storageTypeFileMetadata =>
{
    var fileManager = FileManagerFactory.CreateFileManager((StorageType)storageTypeFileMetadata.Key);
    return fileManager.UpdateFilesAsync(storageTypeFileMetadata);
}, 4);
Kevin Gosse
  • 38,392
  • 3
  • 78
  • 94
  • Still testing this but it looks like async needs to be added before the lambda .ExecuteAsync(async storageTypeFileMetadata => – mameesh Oct 09 '17 at 17:43
  • 1
    @mameesh Only if you await something in it. In the example I gave, that is not necessary – Kevin Gosse Oct 10 '17 at 06:51